You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/18 10:51:30 UTC

svn commit: r628671 - in /incubator/sling/trunk/sling: event/src/main/java/org/apache/sling/event/impl/ event/src/test/java/org/apache/sling/event/impl/ threads/src/main/java/org/apache/sling/threads/ threads/src/main/java/org/apache/sling/threads/impl/

Author: cziegeler
Date: Mon Feb 18 01:51:26 2008
New Revision: 628671

URL: http://svn.apache.org/viewvc?rev=628671&view=rev
Log:
Create own thread pool for eventing.

Modified:
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
    incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
    incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
    incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
    incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Mon Feb 18 01:51:26 2008
@@ -118,7 +118,10 @@
             if ( threadPoolManager == null ) {
                 throw new Exception("No ThreadPoolManager found.");
             }
-            this.threadPool = threadPoolManager.get("SLING_EVENTING");
+            threadPoolManager.create(EventHelper.THREAD_POOL_NAME,
+                                     10, 30, 30, -1,
+                                     ThreadPoolManager.DEFAULT_BLOCK_POLICY, true, 5000, null, 0, false);
+            this.threadPool = threadPoolManager.get(EventHelper.THREAD_POOL_NAME);
             if ( this.threadPool == null ) {
                 throw new Exception("No thread pool found.");
             }

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/EventHelper.java Mon Feb 18 01:51:26 2008
@@ -24,6 +24,8 @@
  */
 public abstract class EventHelper {
 
+    public static final String THREAD_POOL_NAME = "SLING_EVENTING";
+
     public static final String NODE_PROPERTY_TOPIC = "slingevent:topic";
     public static final String NODE_PROPERTY_APPLICATION = "slingevent:application";
     public static final String NODE_PROPERTY_CREATED = "slingevent:created";

Modified: incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ incubator/sling/trunk/sling/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Mon Feb 18 01:51:26 2008
@@ -94,8 +94,11 @@
         final ThreadPool pool = new ThreadPoolImpl();
         final ThreadPoolManager thpm = this.getMockery().mock(ThreadPoolManager.class);
         this.getMockery().checking(new Expectations() {{
-            allowing(thpm).get("SLING_EVENTING");
+            allowing(thpm).get(EventHelper.THREAD_POOL_NAME);
             will(returnValue(pool));
+            allowing(thpm).create(EventHelper.THREAD_POOL_NAME, 10, 30, 30, -1,
+                    ThreadPoolManager.DEFAULT_BLOCK_POLICY, true, 5000, null, 0, false);
+            will(returnValue(null));
         }});
 
         // lets set up the bundle context with the sling id

Modified: incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/ThreadPoolManager.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/ThreadPoolManager.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/ThreadPoolManager.java (original)
+++ incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/ThreadPoolManager.java Mon Feb 18 01:51:26 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.sling.threads;
 
+import java.util.concurrent.ThreadFactory;
+
 /**
  * The <cod>ThreadPoolManager</code> manages thread pools.
  *
@@ -23,18 +25,54 @@
  */
 public interface ThreadPoolManager {
 
+    /** The default thread pool name */
+    String DEFAULT_THREADPOOL_NAME = "default";
+
+    /** The thread pool policies. */
+    enum ThreadPoolPolicy {
+        ABORT,
+        DISCARD,
+        DISCARDOLDEST,
+        RUN
+    };
+
+    /** The default policy */
+    ThreadPoolPolicy DEFAULT_BLOCK_POLICY = ThreadPoolPolicy.RUN;
+
     /**
      * Add a new pool.
      * If a pool with the same name already exists, the new pool is not added
      * and false is returned.
-     * @param pool The pool
+     * @param pool The pool.
      * @return True if the pool could be added, false otherwise.
      */
     boolean add(ThreadPool pool);
 
     /**
-     * Get a thread pool
+     * Get a thread pool.
+     * If there is no thread pool with the given name, the default thread
+     * pool is returned.
      * @param name The name of the thread pool or null for the default pool.
      */
     ThreadPool get(String name);
+
+    /**
+     * Create a new thread pool.
+     * If a pool with the same name already exists, no new pool is created
+     * and <code>null</code> is returned.
+     * @param name Name must not be null.
+     * @param blockPolicy The thread pool policy or null for the default.
+     * @param factory A thread factory or null for the default favtory.
+     */
+    ThreadPool create(String name,
+                     int   minPoolSize,
+                     int   maxPoolSize,
+                     final int queueSize,
+                     long  keepAliveTime,
+                     ThreadPoolPolicy blockPolicy,
+                     final boolean shutdownGraceful,
+                     final int shutdownWaitTimeMs,
+                     final ThreadFactory factory,
+                     final int   priority,
+                     final boolean isDaemon);
 }

Modified: incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java (original)
+++ incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java Mon Feb 18 01:51:26 2008
@@ -59,31 +59,12 @@
      * @param name - The name of the thread pool. If null {@link DefaultThreadPoolManager#DEFAULT_THREADPOOL_NAME}
      *               is used
      */
-    public DefaultThreadPool(final String name) {
-        this(DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME,
-                DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE,
-                DefaultThreadPoolManager.DEFAULT_MAX_POOL_SIZE,
-                DefaultThreadPoolManager.DEFAULT_QUEUE_SIZE,
-                DefaultThreadPoolManager.DEFAULT_KEEP_ALIVE_TIME,
-                DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY,
-                DefaultThreadPoolManager.DEFAULT_SHUTDOWN_GRACEFUL,
-                DefaultThreadPoolManager.DEFAULT_SHUTDOWN_WAIT_TIME,
-             null,
-             DefaultThreadPoolManager.DEFAULT_THREAD_PRIORITY,
-             DefaultThreadPoolManager.DEFAULT_DAEMON_MODE);
-    }
-
-    /**
-     * Create a new thread pool.
-     * @param name - The name of the thread pool. If null {@link DefaultThreadPoolManager#DEFAULT_THREADPOOL_NAME}
-     *               is used
-     */
     public DefaultThreadPool(final String name,
                              int   minPoolSize,
                              int   maxPoolSize,
                              final int queueSize,
                              long  keepAliveTime,
-                             String blockPolicy,
+                             ThreadPoolManager.ThreadPoolPolicy blockPolicy,
                              final boolean shutdownGraceful,
                              final int shutdownWaitTimeMs,
                              final ThreadFactory factory,
@@ -156,26 +137,20 @@
         if ( blockPolicy == null ) {
             blockPolicy = DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY;
         }
-        final RejectedExecutionHandler handler;
-        if (DefaultThreadPoolManager.POLICY_ABORT.equalsIgnoreCase(blockPolicy)) {
-            handler = new ThreadPoolExecutor.AbortPolicy();
-        } else if (DefaultThreadPoolManager.POLICY_DISCARD.equalsIgnoreCase(blockPolicy)) {
-            handler = new ThreadPoolExecutor.AbortPolicy();
-        } else if (DefaultThreadPoolManager.POLICY_DISCARD_OLDEST.equalsIgnoreCase(blockPolicy)) {
-            handler = new ThreadPoolExecutor.AbortPolicy();
-        } else if (DefaultThreadPoolManager.POLICY_RUN.equalsIgnoreCase(blockPolicy)) {
-            handler = new ThreadPoolExecutor.AbortPolicy();
-        } else {
-            final StringBuffer msg = new StringBuffer();
-            msg.append("WARNING: Unknown block-policy configuration \"")
-                .append(blockPolicy);
-            msg.append("\". Should be one of \"").append(DefaultThreadPoolManager.POLICY_ABORT);
-            msg.append("\",\"").append(DefaultThreadPoolManager.POLICY_DISCARD);
-            msg.append("\",\"").append(DefaultThreadPoolManager.POLICY_DISCARD_OLDEST);
-            msg.append("\",\"").append(DefaultThreadPoolManager.POLICY_RUN);
-            msg.append("\". Will use \"").append(DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY).append("\"");
-            logger.warn(msg.toString());
-            handler = new ThreadPoolExecutor.CallerRunsPolicy();
+        RejectedExecutionHandler handler = null;
+        switch (blockPolicy) {
+            case ABORT :
+                handler = new ThreadPoolExecutor.AbortPolicy();
+                break;
+            case DISCARD :
+                handler = new ThreadPoolExecutor.AbortPolicy();
+                break;
+            case DISCARDOLDEST :
+                handler = new ThreadPoolExecutor.AbortPolicy();
+                break;
+            case RUN :
+                handler = new ThreadPoolExecutor.AbortPolicy();
+                break;
         }
         this.shutdownGraceful = shutdownGraceful;
         this.shutdownWaitTimeMs = shutdownWaitTimeMs;

Modified: incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java?rev=628671&r1=628670&r2=628671&view=diff
==============================================================================
--- incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java (original)
+++ incubator/sling/trunk/sling/threads/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java Mon Feb 18 01:51:26 2008
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.sling.threads.ThreadPool;
 import org.apache.sling.threads.ThreadPoolManager;
@@ -60,24 +61,6 @@
     /** The default shutdown waittime time */
     protected final static int DEFAULT_SHUTDOWN_WAIT_TIME = -1;
 
-    /** The default shutdown waittime time */
-    protected final static String DEFAULT_THREADPOOL_NAME = "default";
-
-    /** ThreadPool block policy ABORT */
-    protected final static String POLICY_ABORT = "ABORT";
-
-    /** ThreadPool block policy DISCARD */
-    protected final static String POLICY_DISCARD = "DISCARD";
-
-    /** ThreadPool block policy DISCARD-OLDEST */
-    protected final static String POLICY_DISCARD_OLDEST = "DISCARDOLDEST";
-
-    /** ThreadPool block policy RUN */
-    protected final static String POLICY_RUN = "RUN";
-
-    /** The default shutdown waittime time */
-    protected final static String DEFAULT_BLOCK_POLICY = POLICY_RUN;
-
     /** By default we use the logger for this class. */
     protected Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -101,7 +84,9 @@
                     null,
                     DEFAULT_THREAD_PRIORITY,
                     DEFAULT_DAEMON_MODE);
-        this.pools.put(defaultPool.getName(), defaultPool);
+        synchronized ( this.pools ) {
+            this.pools.put(defaultPool.getName(), defaultPool);
+        }
         this.logger.info("Thread pool manager startet with default pool.");
     }
 
@@ -112,14 +97,16 @@
         this.logger.info("Stopping thread pool manager.");
         this.logger.debug("Disposing all thread pools");
 
-        for (ThreadPool pool : this.pools.values()) {
-            this.logger.debug("Shutting down thread pool {}", pool.getName());
+        synchronized ( this.pools ) {
+            for (ThreadPool pool : this.pools.values()) {
+                this.logger.debug("Shutting down thread pool {}", pool.getName());
 
-            pool.shutdown();
+                pool.shutdown();
 
-            this.logger.debug("Thread pool " + pool.getName() + " is shut down.");
+                this.logger.debug("Thread pool " + pool.getName() + " is shut down.");
+            }
+            this.pools.clear();
         }
-        this.pools.clear();
         this.logger.info("Thread pool manager stopped.");
     }
 
@@ -152,4 +139,44 @@
             return pool;
         }
     }
+
+    /**
+     * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, int, int, int, long, org.apache.sling.threads.ThreadPoolManager.ThreadPoolPolicy, boolean, int, java.util.concurrent.ThreadFactory, int, boolean)
+     */
+    public ThreadPool create(String name,
+                             int minPoolSize,
+                             int maxPoolSize,
+                             int queueSize,
+                             long keepAliveTime,
+                             ThreadPoolPolicy blockPolicy,
+                             boolean shutdownGraceful,
+                             int shutdownWaitTimeMs,
+                             ThreadFactory factory,
+                             int priority,
+                             boolean isDaemon) {
+        if ( name == null ) {
+            throw new IllegalArgumentException("Name must not be null.");
+        }
+        synchronized ( this.pools ) {
+            ThreadPool pool = this.pools.get(name);
+            if ( pool != null ) {
+                // pool already exists
+                return null;
+            }
+            pool = new DefaultThreadPool(name,
+                                         minPoolSize,
+                                         maxPoolSize,
+                                         queueSize,
+                                         keepAliveTime,
+                                         blockPolicy,
+                                         shutdownGraceful,
+                                         shutdownWaitTimeMs,
+                                         factory,
+                                         priority,
+                                         isDaemon);
+            this.pools.put(name, pool);
+            return pool;
+        }
+    }
+
 }