You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:24:30 UTC
[sling-org-apache-sling-commons-threads] 07/33: Use thread pool
configuration object to be extensible.
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to annotated tag org.apache.sling.commons.threads-2.0.2-incubator
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git
commit a5761c4442ccf6e02c504b32d03d5826f7fd60ef
Author: Carsten Ziegeler <cz...@apache.org>
AuthorDate: Mon Feb 18 17:13:36 2008 +0000
Use thread pool configuration object to be extensible.
git-svn-id: https://svn.apache.org/repos/asf/incubator/sling/trunk/sling/threads@628820 13f79535-47bb-0310-9956-ffa450edef68
---
.../java/org/apache/sling/threads/ThreadPool.java | 2 +-
.../org/apache/sling/threads/ThreadPoolConfig.java | 202 +++++++++++++++++++++
.../apache/sling/threads/ThreadPoolManager.java | 26 +--
.../sling/threads/impl/DefaultThreadPool.java | 78 ++++----
.../threads/impl/DefaultThreadPoolManager.java | 65 +------
.../sling/threads/impl/ExtendedThreadFactory.java | 30 +--
6 files changed, 263 insertions(+), 140 deletions(-)
diff --git a/src/main/java/org/apache/sling/threads/ThreadPool.java b/src/main/java/org/apache/sling/threads/ThreadPool.java
index c849ed5..db8eb0b 100644
--- a/src/main/java/org/apache/sling/threads/ThreadPool.java
+++ b/src/main/java/org/apache/sling/threads/ThreadPool.java
@@ -40,5 +40,5 @@ public interface ThreadPool {
*/
void shutdown();
- int getMaxPoolSize();
+ ThreadPoolConfig getConfiguration();
}
diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java
new file mode 100644
index 0000000..a5792db
--- /dev/null
+++ b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.threads;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The ThreadPool configuration.
+ *
+ * @version $Id$
+ */
+public final class ThreadPoolConfig {
+
+ /** The thread pool policies. */
+ public enum ThreadPoolPolicy {
+ ABORT,
+ DISCARD,
+ DISCARDOLDEST,
+ RUN
+ };
+
+ public enum ThreadPriority {
+ NORM,
+ MIN,
+ MAX
+ };
+
+ /** The min pool size. */
+ private int minPoolSize = 5;
+
+ /** The max pool size. */
+ private int maxPoolSize = 5;
+
+ /** The queue size */
+ private int queueSize = -1;
+
+ /** The keep alive time. */
+ private long keepAliveTime = 60000L;
+
+ /** The thread pool policy. Default is RUN. */
+ private ThreadPoolPolicy blockPolicy = ThreadPoolPolicy.RUN;
+
+ private boolean shutdownGraceful = false;
+
+ private int shutdownWaitTimeMs = -1;
+
+ private ThreadFactory factory;
+
+ private ThreadPriority priority = ThreadPriority.NORM;
+
+ private boolean isDaemon = false;
+
+ /** Can this configuration still be changed? */
+ private boolean isWritable = true;
+
+ /**
+ * Create a new default configuration.
+ */
+ public ThreadPoolConfig() {
+ // nothing to do
+ }
+
+ /**
+ * Clone an existing configuration
+ * @param copy The config to clone
+ */
+ public ThreadPoolConfig(ThreadPoolConfig copy) {
+ this.minPoolSize = copy.minPoolSize;
+ this.maxPoolSize = copy.maxPoolSize;
+ this.queueSize = copy.queueSize;
+ this.keepAliveTime = copy.keepAliveTime;
+ this.blockPolicy = copy.blockPolicy;
+ this.shutdownGraceful = copy.shutdownGraceful;
+ this.shutdownWaitTimeMs = copy.shutdownWaitTimeMs;
+ this.factory = copy.factory;
+ this.priority = copy.priority;
+ this.isDaemon = copy.isDaemon;
+ }
+
+ protected void checkWritable() {
+ if ( !isWritable ) {
+ throw new IllegalStateException("ThreadPoolConfig is read-only.");
+ }
+ }
+
+ /**
+ * Make the configuration read-only.
+ */
+ public void makeReadOnly() {
+ this.isWritable = false;
+ }
+
+ public int getMinPoolSize() {
+ return minPoolSize;
+ }
+
+ public void setMinPoolSize(int minPoolSize) {
+ this.checkWritable();
+ this.minPoolSize = minPoolSize;
+ }
+
+ public int getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ public void setMaxPoolSize(int maxPoolSize) {
+ this.checkWritable();
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(int queueSize) {
+ this.checkWritable();
+ this.queueSize = queueSize;
+ }
+
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.checkWritable();
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public ThreadPoolPolicy getBlockPolicy() {
+ return blockPolicy;
+ }
+
+ public void setBlockPolicy(ThreadPoolPolicy blockPolicy) {
+ this.checkWritable();
+ this.blockPolicy = blockPolicy;
+ if ( blockPolicy == null ) {
+ throw new IllegalArgumentException("Policy must not be null.");
+ }
+ }
+
+ public boolean isShutdownGraceful() {
+ return shutdownGraceful;
+ }
+
+ public void setShutdownGraceful(boolean shutdownGraceful) {
+ this.checkWritable();
+ this.shutdownGraceful = shutdownGraceful;
+ }
+
+ public int getShutdownWaitTimeMs() {
+ return shutdownWaitTimeMs;
+ }
+
+ public void setShutdownWaitTimeMs(int shutdownWaitTimeMs) {
+ this.checkWritable();
+ this.shutdownWaitTimeMs = shutdownWaitTimeMs;
+ }
+
+ public ThreadFactory getFactory() {
+ return factory;
+ }
+
+ public void setFactory(ThreadFactory factory) {
+ this.checkWritable();
+ this.factory = factory;
+ }
+
+ public ThreadPriority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(ThreadPriority priority) {
+ this.checkWritable();
+ if ( priority == null ) {
+ throw new IllegalArgumentException("Priority must not be null.");
+ }
+ this.priority = priority;
+ }
+
+ public boolean isDaemon() {
+ return isDaemon;
+ }
+
+ public void setDaemon(boolean isDaemon) {
+ this.checkWritable();
+ this.isDaemon = isDaemon;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
index 62610b4..db7c4e6 100644
--- a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
+++ b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.sling.threads;
-import java.util.concurrent.ThreadFactory;
/**
* The <cod>ThreadPoolManager</code> manages thread pools.
@@ -28,17 +27,6 @@ public interface ThreadPoolManager {
/** The default thread pool name */
String DEFAULT_THREADPOOL_NAME = "default";
- /** The thread pool policies. */
- enum ThreadPoolPolicy {
- ABORT,
- DISCARD,
- DISCARDOLDEST,
- RUN
- };
-
- /** The default policy */
- ThreadPoolPolicy DEFAULT_BLOCK_POLICY = ThreadPoolPolicy.RUN;
-
/**
* Add a new pool.
* If a pool with the same name already exists, the new pool is not added
@@ -61,18 +49,8 @@ public interface ThreadPoolManager {
* If a pool with the same name already exists, no new pool is created
* and <code>null</code> is returned.
* @param name Name must not be null.
- * @param blockPolicy The thread pool policy or null for the default.
- * @param factory A thread factory or null for the default favtory.
+ * @param config The thread pool configuration.
*/
ThreadPool create(String name,
- int minPoolSize,
- int maxPoolSize,
- final int queueSize,
- long keepAliveTime,
- ThreadPoolPolicy blockPolicy,
- final boolean shutdownGraceful,
- final int shutdownWaitTimeMs,
- final ThreadFactory factory,
- final int priority,
- final boolean isDaemon);
+ ThreadPoolConfig config);
}
diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
index 37289bd..d66d810 100644
--- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
+++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.sling.threads.ThreadPool;
+import org.apache.sling.threads.ThreadPoolConfig;
import org.apache.sling.threads.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +50,7 @@ public class DefaultThreadPool
/** The executor. */
protected ThreadPoolExecutor executor;
- /** Should we wait for running jobs to terminate on shutdown ? */
- protected final boolean shutdownGraceful;
-
- /** How long to wait for running jobs to terminate on disposition */
- protected final int shutdownWaitTimeMs;
+ protected final ThreadPoolConfig configuration;
/**
* Create a new thread pool.
@@ -61,16 +58,7 @@ public class DefaultThreadPool
* is used
*/
public DefaultThreadPool(final String name,
- int minPoolSize,
- int maxPoolSize,
- final int queueSize,
- long keepAliveTime,
- ThreadPoolManager.ThreadPoolPolicy blockPolicy,
- final boolean shutdownGraceful,
- final int shutdownWaitTimeMs,
- final ThreadFactory factory,
- final int priority,
- final boolean isDaemon) {
+ ThreadPoolConfig origConfig) {
this.logger.info("ThreadPool [{}] initializing ...", name);
// name
@@ -80,42 +68,41 @@ public class DefaultThreadPool
this.name = DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME;
}
+ this.configuration = new ThreadPoolConfig(origConfig);
+
// factory
final ThreadFactory delegateThreadFactory;
- if (factory == null) {
+ if (this.configuration.getFactory() == null) {
logger.warn("No ThreadFactory is configured. Will use JVM default thread factory."
+ ExtendedThreadFactory.class.getName());
delegateThreadFactory = Executors.defaultThreadFactory();
} else {
- delegateThreadFactory = factory;
+ delegateThreadFactory = this.configuration.getFactory();
}
// Min pool size
- // make sure we have enough threads for the default thread pool as we
- // need one for ourself
- if (DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME.equals(name)
- && ((minPoolSize > 0) && (minPoolSize < DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE))) {
- minPoolSize = DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE;
- } else if (minPoolSize < 1) {
- minPoolSize = 1;
+ if (this.configuration.getMinPoolSize() < 1) {
+ this.configuration.setMinPoolSize(1);
this.logger.warn("min-pool-size < 1 for pool \"" + name + "\". Set to 1");
}
// Max pool size
- maxPoolSize = (maxPoolSize < 0) ? Integer.MAX_VALUE : maxPoolSize;
+ if ( this.configuration.getMaxPoolSize() < 0 ) {
+ this.configuration.setMaxPoolSize(Integer.MAX_VALUE);
+ }
// Set priority and daemon flag
- final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, priority, isDaemon);
+ final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, this.configuration.getPriority(), this.configuration.isDaemon());
// Keep alive time
- if (keepAliveTime < 0) {
- keepAliveTime = 1000;
+ if (this.configuration.getKeepAliveTime() < 0) {
+ this.configuration.setKeepAliveTime(1000);
this.logger.warn("keep-alive-time-ms < 0 for pool \"" + name + "\". Set to 1000");
}
// Queue
final BlockingQueue<Runnable> queue;
- if (queueSize != 0) {
- if (queueSize > 0) {
- queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueSize);
+ if (this.configuration.getQueueSize() != 0) {
+ if (this.configuration.getQueueSize() > 0) {
+ queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize());
} else {
queue = new LinkedBlockingQueue<Runnable>();
}
@@ -123,11 +110,8 @@ public class DefaultThreadPool
queue = new SynchronousQueue<Runnable>();
}
- if ( blockPolicy == null ) {
- blockPolicy = DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY;
- }
RejectedExecutionHandler handler = null;
- switch (blockPolicy) {
+ switch (this.configuration.getBlockPolicy()) {
case ABORT :
handler = new ThreadPoolExecutor.AbortPolicy();
break;
@@ -141,15 +125,14 @@ public class DefaultThreadPool
handler = new ThreadPoolExecutor.AbortPolicy();
break;
}
- this.shutdownGraceful = shutdownGraceful;
- this.shutdownWaitTimeMs = shutdownWaitTimeMs;
- this.executor = new ThreadPoolExecutor(minPoolSize,
- maxPoolSize,
- keepAliveTime,
+ this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(),
+ this.configuration.getMaxPoolSize(),
+ this.configuration.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler);
+ this.configuration.makeReadOnly();
this.logger.info("ThreadPool [{}] initialized.", name);
}
@@ -160,11 +143,12 @@ public class DefaultThreadPool
return name;
}
+
/**
- * @see org.apache.sling.threads.ThreadPool#getMaxPoolSize()
+ * @see org.apache.sling.threads.ThreadPool#getConfiguration()
*/
- public int getMaxPoolSize() {
- return this.executor.getMaximumPoolSize();
+ public ThreadPoolConfig getConfiguration() {
+ return this.configuration;
}
/**
@@ -186,17 +170,17 @@ public class DefaultThreadPool
*/
public void shutdown() {
if ( this.executor != null ) {
- if (shutdownGraceful) {
+ if (this.configuration.isShutdownGraceful()) {
this.executor.shutdown();
} else {
this.executor.shutdownNow();
}
try {
- if (this.shutdownWaitTimeMs > 0) {
- if (!this.executor.awaitTermination(this.shutdownWaitTimeMs, TimeUnit.MILLISECONDS)) {
+ if (this.configuration.getShutdownWaitTimeMs() > 0) {
+ if (!this.executor.awaitTermination(this.configuration.getShutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) {
logger.warn("running commands have not terminated within "
- + this.shutdownWaitTimeMs
+ + this.configuration.getShutdownWaitTimeMs()
+ "ms. Will shut them down by interruption");
this.executor.shutdownNow();
}
diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
index fcaa35e..72cdc32 100644
--- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
+++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
@@ -18,9 +18,9 @@ package org.apache.sling.threads.impl;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ThreadFactory;
import org.apache.sling.threads.ThreadPool;
+import org.apache.sling.threads.ThreadPoolConfig;
import org.apache.sling.threads.ThreadPoolManager;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -37,30 +37,6 @@ import org.slf4j.LoggerFactory;
*/
public class DefaultThreadPoolManager implements ThreadPoolManager {
- /** The default queue size */
- protected final static int DEFAULT_QUEUE_SIZE = -1;
-
- /** The default maximum pool size */
- protected final static int DEFAULT_MAX_POOL_SIZE = 5;
-
- /** The default minimum pool size */
- protected final static int DEFAULT_MIN_POOL_SIZE = 5;
-
- /** The default thread priority */
- protected final static int DEFAULT_THREAD_PRIORITY = Thread.NORM_PRIORITY;
-
- /** The default daemon mode */
- protected final static boolean DEFAULT_DAEMON_MODE = false;
-
- /** The default keep alive time */
- protected final static long DEFAULT_KEEP_ALIVE_TIME = 60000L;
-
- /** The default way to shutdown gracefully */
- protected final static boolean DEFAULT_SHUTDOWN_GRACEFUL = false;
-
- /** The default shutdown waittime time */
- protected final static int DEFAULT_SHUTDOWN_WAIT_TIME = -1;
-
/** By default we use the logger for this class. */
protected Logger logger = LoggerFactory.getLogger(getClass());
@@ -74,16 +50,7 @@ public class DefaultThreadPoolManager implements ThreadPoolManager {
this.logger.info("Starting thread pool manager.");
final ThreadPool defaultPool = new DefaultThreadPool(
DEFAULT_THREADPOOL_NAME,
- DEFAULT_MIN_POOL_SIZE,
- DEFAULT_MAX_POOL_SIZE,
- DEFAULT_QUEUE_SIZE,
- DEFAULT_KEEP_ALIVE_TIME,
- DEFAULT_BLOCK_POLICY,
- DEFAULT_SHUTDOWN_GRACEFUL,
- DEFAULT_SHUTDOWN_WAIT_TIME,
- null,
- DEFAULT_THREAD_PRIORITY,
- DEFAULT_DAEMON_MODE);
+ new ThreadPoolConfig());
synchronized ( this.pools ) {
this.pools.put(defaultPool.getName(), defaultPool);
}
@@ -141,39 +108,23 @@ public class DefaultThreadPoolManager implements ThreadPoolManager {
}
/**
- * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, int, int, int, long, org.apache.sling.threads.ThreadPoolManager.ThreadPoolPolicy, boolean, int, java.util.concurrent.ThreadFactory, int, boolean)
+ * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, org.apache.sling.threads.ThreadPoolConfig)
*/
public ThreadPool create(String name,
- int minPoolSize,
- int maxPoolSize,
- int queueSize,
- long keepAliveTime,
- ThreadPoolPolicy blockPolicy,
- boolean shutdownGraceful,
- int shutdownWaitTimeMs,
- ThreadFactory factory,
- int priority,
- boolean isDaemon) {
+ ThreadPoolConfig config) {
if ( name == null ) {
throw new IllegalArgumentException("Name must not be null.");
}
+ if ( config == null ) {
+ throw new IllegalArgumentException("Config must not be null.");
+ }
synchronized ( this.pools ) {
ThreadPool pool = this.pools.get(name);
if ( pool != null ) {
// pool already exists
return null;
}
- pool = new DefaultThreadPool(name,
- minPoolSize,
- maxPoolSize,
- queueSize,
- keepAliveTime,
- blockPolicy,
- shutdownGraceful,
- shutdownWaitTimeMs,
- factory,
- priority,
- isDaemon);
+ pool = new DefaultThreadPool(name, config);
this.pools.put(name, pool);
return pool;
}
diff --git a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
index 7a030a7..45a17a7 100644
--- a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
+++ b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
@@ -18,12 +18,14 @@ package org.apache.sling.threads.impl;
import java.util.concurrent.ThreadFactory;
+import org.apache.sling.threads.ThreadPoolConfig;
+
/**
* This class is responsible to create new Thread instances.
* It's a very basic implementation.
*
- * @version $Id$
+ * @version $Id: DefaultThreadFactory.java 628678 2008-02-18 10:40:12Z cziegeler $
*/
public final class ExtendedThreadFactory implements ThreadFactory {
@@ -39,21 +41,27 @@ public final class ExtendedThreadFactory implements ThreadFactory {
/**
* Create a new wrapper for a thread factory handling the
*
- * @param priority One of {@link Thread#MIN_PRIORITY}, {@link
- * Thread#NORM_PRIORITY}, {@link Thread#MAX_PRIORITY}
+ * @param priority A non null value.
* @param isDaemon Whether new {@link Thread}s should run as daemons.
*/
public ExtendedThreadFactory(final ThreadFactory factory,
- final int priority,
+ final ThreadPoolConfig.ThreadPriority priority,
final boolean isDaemon) {
this.isDaemon = isDaemon;
- if( ( Thread.MAX_PRIORITY == priority ) ||
- ( Thread.MIN_PRIORITY == priority ) ||
- ( Thread.NORM_PRIORITY == priority ) ) {
- this.priority = priority;
- } else {
- throw new IllegalStateException("Unknown priority " + priority);
- }
+ if ( priority == null ) {
+ throw new IllegalStateException("Prioriy must not be null.");
+ }
+ switch ( priority ) {
+ case NORM : this.priority = Thread.NORM_PRIORITY;
+ break;
+ case MIN : this.priority = Thread.MIN_PRIORITY;
+ break;
+ case MAX : this.priority = Thread.MAX_PRIORITY;
+ break;
+ default: // this can never happen
+ this.priority = Thread.NORM_PRIORITY;
+ break;
+ }
this.factory = factory;
}
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.