You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:24:30 UTC

[sling-org-apache-sling-commons-threads] 07/33: Use thread pool configuration object to be extensible.

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.commons.threads-2.0.2-incubator
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git

commit a5761c4442ccf6e02c504b32d03d5826f7fd60ef
Author: Carsten Ziegeler <cz...@apache.org>
AuthorDate: Mon Feb 18 17:13:36 2008 +0000

    Use thread pool configuration object to be extensible.
    
    git-svn-id: https://svn.apache.org/repos/asf/incubator/sling/trunk/sling/threads@628820 13f79535-47bb-0310-9956-ffa450edef68
---
 .../java/org/apache/sling/threads/ThreadPool.java  |   2 +-
 .../org/apache/sling/threads/ThreadPoolConfig.java | 202 +++++++++++++++++++++
 .../apache/sling/threads/ThreadPoolManager.java    |  26 +--
 .../sling/threads/impl/DefaultThreadPool.java      |  78 ++++----
 .../threads/impl/DefaultThreadPoolManager.java     |  65 +------
 .../sling/threads/impl/ExtendedThreadFactory.java  |  30 +--
 6 files changed, 263 insertions(+), 140 deletions(-)

diff --git a/src/main/java/org/apache/sling/threads/ThreadPool.java b/src/main/java/org/apache/sling/threads/ThreadPool.java
index c849ed5..db8eb0b 100644
--- a/src/main/java/org/apache/sling/threads/ThreadPool.java
+++ b/src/main/java/org/apache/sling/threads/ThreadPool.java
@@ -40,5 +40,5 @@ public interface ThreadPool {
      */
     void shutdown();
 
-    int getMaxPoolSize();
+    ThreadPoolConfig getConfiguration();
 }
diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java
new file mode 100644
index 0000000..a5792db
--- /dev/null
+++ b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.threads;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The ThreadPool configuration.
+ *
+ * @version $Id$
+ */
+public final class ThreadPoolConfig {
+
+    /** The thread pool policies. */
+    public enum ThreadPoolPolicy {
+        ABORT,
+        DISCARD,
+        DISCARDOLDEST,
+        RUN
+    };
+
+    public enum ThreadPriority {
+        NORM,
+        MIN,
+        MAX
+    };
+
+    /** The min pool size. */
+    private int minPoolSize = 5;
+
+    /** The max pool size. */
+    private int maxPoolSize = 5;
+
+    /** The queue size */
+    private int queueSize = -1;
+
+    /** The keep alive time. */
+    private long  keepAliveTime = 60000L;
+
+    /** The thread pool policy. Default is RUN. */
+    private ThreadPoolPolicy blockPolicy = ThreadPoolPolicy.RUN;
+
+    private  boolean shutdownGraceful = false;
+
+    private  int shutdownWaitTimeMs = -1;
+
+    private  ThreadFactory factory;
+
+    private  ThreadPriority   priority = ThreadPriority.NORM;
+
+    private  boolean isDaemon = false;
+
+    /** Can this configuration still be changed? */
+    private boolean isWritable = true;
+
+    /**
+     * Create a new default configuration.
+     */
+    public ThreadPoolConfig() {
+        // nothing to do
+    }
+
+    /**
+     * Clone an existing configuration
+     * @param copy The config to clone
+     */
+    public ThreadPoolConfig(ThreadPoolConfig copy) {
+        this.minPoolSize = copy.minPoolSize;
+        this.maxPoolSize = copy.maxPoolSize;
+        this.queueSize = copy.queueSize;
+        this.keepAliveTime = copy.keepAliveTime;
+        this.blockPolicy = copy.blockPolicy;
+        this.shutdownGraceful = copy.shutdownGraceful;
+        this.shutdownWaitTimeMs = copy.shutdownWaitTimeMs;
+        this.factory = copy.factory;
+        this.priority = copy.priority;
+        this.isDaemon = copy.isDaemon;
+    }
+
+    protected void checkWritable() {
+        if ( !isWritable ) {
+            throw new IllegalStateException("ThreadPoolConfig is read-only.");
+        }
+    }
+
+    /**
+     * Make the configuration read-only.
+     */
+    public void makeReadOnly() {
+        this.isWritable = false;
+    }
+
+    public int getMinPoolSize() {
+        return minPoolSize;
+    }
+
+    public void setMinPoolSize(int minPoolSize) {
+        this.checkWritable();
+        this.minPoolSize = minPoolSize;
+    }
+
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.checkWritable();
+        this.maxPoolSize = maxPoolSize;
+    }
+
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(int queueSize) {
+        this.checkWritable();
+        this.queueSize = queueSize;
+    }
+
+    public long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.checkWritable();
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public ThreadPoolPolicy getBlockPolicy() {
+        return blockPolicy;
+    }
+
+    public void setBlockPolicy(ThreadPoolPolicy blockPolicy) {
+        this.checkWritable();
+        this.blockPolicy = blockPolicy;
+        if ( blockPolicy == null ) {
+            throw new IllegalArgumentException("Policy must not be null.");
+        }
+    }
+
+    public boolean isShutdownGraceful() {
+        return shutdownGraceful;
+    }
+
+    public void setShutdownGraceful(boolean shutdownGraceful) {
+        this.checkWritable();
+        this.shutdownGraceful = shutdownGraceful;
+    }
+
+    public int getShutdownWaitTimeMs() {
+        return shutdownWaitTimeMs;
+    }
+
+    public void setShutdownWaitTimeMs(int shutdownWaitTimeMs) {
+        this.checkWritable();
+        this.shutdownWaitTimeMs = shutdownWaitTimeMs;
+    }
+
+    public ThreadFactory getFactory() {
+        return factory;
+    }
+
+    public void setFactory(ThreadFactory factory) {
+        this.checkWritable();
+        this.factory = factory;
+    }
+
+    public ThreadPriority getPriority() {
+        return priority;
+    }
+
+    public void setPriority(ThreadPriority priority) {
+        this.checkWritable();
+        if ( priority == null ) {
+            throw new IllegalArgumentException("Priority must not be null.");
+        }
+        this.priority = priority;
+    }
+
+    public boolean isDaemon() {
+        return isDaemon;
+    }
+
+    public void setDaemon(boolean isDaemon) {
+        this.checkWritable();
+        this.isDaemon = isDaemon;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
index 62610b4..db7c4e6 100644
--- a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
+++ b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.sling.threads;
 
-import java.util.concurrent.ThreadFactory;
 
 /**
  * The <cod>ThreadPoolManager</code> manages thread pools.
@@ -28,17 +27,6 @@ public interface ThreadPoolManager {
     /** The default thread pool name */
     String DEFAULT_THREADPOOL_NAME = "default";
 
-    /** The thread pool policies. */
-    enum ThreadPoolPolicy {
-        ABORT,
-        DISCARD,
-        DISCARDOLDEST,
-        RUN
-    };
-
-    /** The default policy */
-    ThreadPoolPolicy DEFAULT_BLOCK_POLICY = ThreadPoolPolicy.RUN;
-
     /**
      * Add a new pool.
      * If a pool with the same name already exists, the new pool is not added
@@ -61,18 +49,8 @@ public interface ThreadPoolManager {
      * If a pool with the same name already exists, no new pool is created
      * and <code>null</code> is returned.
      * @param name Name must not be null.
-     * @param blockPolicy The thread pool policy or null for the default.
-     * @param factory A thread factory or null for the default favtory.
+     * @param config The thread pool configuration.
      */
     ThreadPool create(String name,
-                     int   minPoolSize,
-                     int   maxPoolSize,
-                     final int queueSize,
-                     long  keepAliveTime,
-                     ThreadPoolPolicy blockPolicy,
-                     final boolean shutdownGraceful,
-                     final int shutdownWaitTimeMs,
-                     final ThreadFactory factory,
-                     final int   priority,
-                     final boolean isDaemon);
+                      ThreadPoolConfig config);
 }
diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
index 37289bd..d66d810 100644
--- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
+++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sling.threads.ThreadPool;
+import org.apache.sling.threads.ThreadPoolConfig;
 import org.apache.sling.threads.ThreadPoolManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +50,7 @@ public class DefaultThreadPool
     /** The executor. */
     protected ThreadPoolExecutor executor;
 
-    /** Should we wait for running jobs to terminate on shutdown ? */
-    protected final boolean shutdownGraceful;
-
-    /** How long to wait for running jobs to terminate on disposition */
-    protected final int shutdownWaitTimeMs;
+    protected final ThreadPoolConfig configuration;
 
     /**
      * Create a new thread pool.
@@ -61,16 +58,7 @@ public class DefaultThreadPool
      *               is used
      */
     public DefaultThreadPool(final String name,
-                             int   minPoolSize,
-                             int   maxPoolSize,
-                             final int queueSize,
-                             long  keepAliveTime,
-                             ThreadPoolManager.ThreadPoolPolicy blockPolicy,
-                             final boolean shutdownGraceful,
-                             final int shutdownWaitTimeMs,
-                             final ThreadFactory factory,
-                             final int   priority,
-                             final boolean isDaemon) {
+                             ThreadPoolConfig origConfig) {
         this.logger.info("ThreadPool [{}] initializing ...", name);
 
         // name
@@ -80,42 +68,41 @@ public class DefaultThreadPool
             this.name = DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME;
         }
 
+        this.configuration = new ThreadPoolConfig(origConfig);
+
         // factory
         final ThreadFactory delegateThreadFactory;
-        if (factory == null) {
+        if (this.configuration.getFactory() == null) {
             logger.warn("No ThreadFactory is configured. Will use JVM default thread factory."
                 + ExtendedThreadFactory.class.getName());
             delegateThreadFactory = Executors.defaultThreadFactory();
         } else {
-            delegateThreadFactory = factory;
+            delegateThreadFactory = this.configuration.getFactory();
         }
         // Min pool size
-        // make sure we have enough threads for the default thread pool as we
-        // need one for ourself
-        if (DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME.equals(name)
-            && ((minPoolSize > 0) && (minPoolSize < DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE))) {
-            minPoolSize = DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE;
-        } else if (minPoolSize < 1) {
-            minPoolSize = 1;
+        if (this.configuration.getMinPoolSize() < 1) {
+            this.configuration.setMinPoolSize(1);
             this.logger.warn("min-pool-size < 1 for pool \"" + name + "\". Set to 1");
         }
         // Max pool size
-        maxPoolSize = (maxPoolSize < 0) ? Integer.MAX_VALUE : maxPoolSize;
+        if ( this.configuration.getMaxPoolSize() < 0 ) {
+            this.configuration.setMaxPoolSize(Integer.MAX_VALUE);
+        }
 
         // Set priority and daemon flag
-        final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, priority, isDaemon);
+        final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, this.configuration.getPriority(), this.configuration.isDaemon());
 
         // Keep alive time
-        if (keepAliveTime < 0) {
-            keepAliveTime = 1000;
+        if (this.configuration.getKeepAliveTime() < 0) {
+            this.configuration.setKeepAliveTime(1000);
             this.logger.warn("keep-alive-time-ms < 0 for pool \"" + name + "\". Set to 1000");
         }
 
         // Queue
         final BlockingQueue<Runnable> queue;
-        if (queueSize != 0) {
-            if (queueSize > 0) {
-                queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueSize);
+        if (this.configuration.getQueueSize() != 0) {
+            if (this.configuration.getQueueSize() > 0) {
+                queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize());
             } else {
                 queue = new LinkedBlockingQueue<Runnable>();
             }
@@ -123,11 +110,8 @@ public class DefaultThreadPool
             queue = new SynchronousQueue<Runnable>();
         }
 
-        if ( blockPolicy == null ) {
-            blockPolicy = DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY;
-        }
         RejectedExecutionHandler handler = null;
-        switch (blockPolicy) {
+        switch (this.configuration.getBlockPolicy()) {
             case ABORT :
                 handler = new ThreadPoolExecutor.AbortPolicy();
                 break;
@@ -141,15 +125,14 @@ public class DefaultThreadPool
                 handler = new ThreadPoolExecutor.AbortPolicy();
                 break;
         }
-        this.shutdownGraceful = shutdownGraceful;
-        this.shutdownWaitTimeMs = shutdownWaitTimeMs;
-        this.executor = new ThreadPoolExecutor(minPoolSize,
-                maxPoolSize,
-                keepAliveTime,
+        this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(),
+                this.configuration.getMaxPoolSize(),
+                this.configuration.getKeepAliveTime(),
                 TimeUnit.MILLISECONDS,
                 queue,
                 threadFactory,
                 handler);
+        this.configuration.makeReadOnly();
         this.logger.info("ThreadPool [{}] initialized.", name);
     }
 
@@ -160,11 +143,12 @@ public class DefaultThreadPool
 	    return name;
     }
 
+
     /**
-     * @see org.apache.sling.threads.ThreadPool#getMaxPoolSize()
+     * @see org.apache.sling.threads.ThreadPool#getConfiguration()
      */
-    public int getMaxPoolSize() {
-        return this.executor.getMaximumPoolSize();
+    public ThreadPoolConfig getConfiguration() {
+        return this.configuration;
     }
 
     /**
@@ -186,17 +170,17 @@ public class DefaultThreadPool
      */
     public void shutdown() {
         if ( this.executor != null ) {
-            if (shutdownGraceful) {
+            if (this.configuration.isShutdownGraceful()) {
                 this.executor.shutdown();
             } else {
                 this.executor.shutdownNow();
             }
 
             try {
-                if (this.shutdownWaitTimeMs > 0) {
-                    if (!this.executor.awaitTermination(this.shutdownWaitTimeMs, TimeUnit.MILLISECONDS)) {
+                if (this.configuration.getShutdownWaitTimeMs() > 0) {
+                    if (!this.executor.awaitTermination(this.configuration.getShutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) {
                         logger.warn("running commands have not terminated within "
-                            + this.shutdownWaitTimeMs
+                            + this.configuration.getShutdownWaitTimeMs()
                             + "ms. Will shut them down by interruption");
                         this.executor.shutdownNow();
                     }
diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
index fcaa35e..72cdc32 100644
--- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
+++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
@@ -18,9 +18,9 @@ package org.apache.sling.threads.impl;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ThreadFactory;
 
 import org.apache.sling.threads.ThreadPool;
+import org.apache.sling.threads.ThreadPoolConfig;
 import org.apache.sling.threads.ThreadPoolManager;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
@@ -37,30 +37,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultThreadPoolManager implements ThreadPoolManager {
 
-    /** The default queue size */
-    protected final static int DEFAULT_QUEUE_SIZE = -1;
-
-    /** The default maximum pool size */
-    protected final static int DEFAULT_MAX_POOL_SIZE = 5;
-
-    /** The default minimum pool size */
-    protected final static int DEFAULT_MIN_POOL_SIZE = 5;
-
-    /** The default thread priority */
-    protected final static int DEFAULT_THREAD_PRIORITY =  Thread.NORM_PRIORITY;
-
-    /** The default daemon mode */
-    protected final static boolean DEFAULT_DAEMON_MODE = false;
-
-    /** The default keep alive time */
-    protected final static long DEFAULT_KEEP_ALIVE_TIME = 60000L;
-
-    /** The default way to shutdown gracefully */
-    protected final static boolean DEFAULT_SHUTDOWN_GRACEFUL = false;
-
-    /** The default shutdown waittime time */
-    protected final static int DEFAULT_SHUTDOWN_WAIT_TIME = -1;
-
     /** By default we use the logger for this class. */
     protected Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -74,16 +50,7 @@ public class DefaultThreadPoolManager implements ThreadPoolManager {
         this.logger.info("Starting thread pool manager.");
         final ThreadPool defaultPool = new DefaultThreadPool(
                     DEFAULT_THREADPOOL_NAME,
-                    DEFAULT_MIN_POOL_SIZE,
-                    DEFAULT_MAX_POOL_SIZE,
-                    DEFAULT_QUEUE_SIZE,
-                    DEFAULT_KEEP_ALIVE_TIME,
-                    DEFAULT_BLOCK_POLICY,
-                    DEFAULT_SHUTDOWN_GRACEFUL,
-                    DEFAULT_SHUTDOWN_WAIT_TIME,
-                    null,
-                    DEFAULT_THREAD_PRIORITY,
-                    DEFAULT_DAEMON_MODE);
+                    new ThreadPoolConfig());
         synchronized ( this.pools ) {
             this.pools.put(defaultPool.getName(), defaultPool);
         }
@@ -141,39 +108,23 @@ public class DefaultThreadPoolManager implements ThreadPoolManager {
     }
 
     /**
-     * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, int, int, int, long, org.apache.sling.threads.ThreadPoolManager.ThreadPoolPolicy, boolean, int, java.util.concurrent.ThreadFactory, int, boolean)
+     * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, org.apache.sling.threads.ThreadPoolConfig)
      */
     public ThreadPool create(String name,
-                             int minPoolSize,
-                             int maxPoolSize,
-                             int queueSize,
-                             long keepAliveTime,
-                             ThreadPoolPolicy blockPolicy,
-                             boolean shutdownGraceful,
-                             int shutdownWaitTimeMs,
-                             ThreadFactory factory,
-                             int priority,
-                             boolean isDaemon) {
+                             ThreadPoolConfig config) {
         if ( name == null ) {
             throw new IllegalArgumentException("Name must not be null.");
         }
+        if ( config == null ) {
+            throw new IllegalArgumentException("Config must not be null.");
+        }
         synchronized ( this.pools ) {
             ThreadPool pool = this.pools.get(name);
             if ( pool != null ) {
                 // pool already exists
                 return null;
             }
-            pool = new DefaultThreadPool(name,
-                                         minPoolSize,
-                                         maxPoolSize,
-                                         queueSize,
-                                         keepAliveTime,
-                                         blockPolicy,
-                                         shutdownGraceful,
-                                         shutdownWaitTimeMs,
-                                         factory,
-                                         priority,
-                                         isDaemon);
+            pool = new DefaultThreadPool(name, config);
             this.pools.put(name, pool);
             return pool;
         }
diff --git a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
index 7a030a7..45a17a7 100644
--- a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
+++ b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java
@@ -18,12 +18,14 @@ package org.apache.sling.threads.impl;
 
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.sling.threads.ThreadPoolConfig;
+
 
 /**
  * This class is responsible to create new Thread instances.
  * It's a very basic implementation.
  *
- * @version $Id$
+ * @version $Id: DefaultThreadFactory.java 628678 2008-02-18 10:40:12Z cziegeler $
  */
 public final class ExtendedThreadFactory implements ThreadFactory {
 
@@ -39,21 +41,27 @@ public final class ExtendedThreadFactory implements ThreadFactory {
     /**
      * Create a new wrapper for a thread factory handling the
      *
-     * @param priority One of {@link Thread#MIN_PRIORITY}, {@link
-     *        Thread#NORM_PRIORITY}, {@link Thread#MAX_PRIORITY}
+     * @param priority A non null value.
      * @param isDaemon Whether new {@link Thread}s should run as daemons.
      */
     public ExtendedThreadFactory(final ThreadFactory factory,
-                                final int priority,
+                                final ThreadPoolConfig.ThreadPriority priority,
                                 final boolean isDaemon) {
         this.isDaemon = isDaemon;
-        if( ( Thread.MAX_PRIORITY == priority ) ||
-                ( Thread.MIN_PRIORITY == priority ) ||
-                ( Thread.NORM_PRIORITY == priority ) ) {
-                this.priority = priority;
-            } else {
-                throw new IllegalStateException("Unknown priority " + priority);
-            }
+        if ( priority == null ) {
+            throw new IllegalStateException("Prioriy must not be null.");
+        }
+        switch ( priority ) {
+            case NORM : this.priority = Thread.NORM_PRIORITY;
+                        break;
+            case MIN  : this.priority = Thread.MIN_PRIORITY;
+                        break;
+            case MAX  : this.priority = Thread.MAX_PRIORITY;
+                        break;
+            default: // this can never happen
+                        this.priority = Thread.NORM_PRIORITY;
+                        break;
+        }
         this.factory = factory;
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.