You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/04/30 08:43:44 UTC

svn commit: r533604 - in /webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp: ClientHandler.java ServerHandler.java util/BackportWorkerPool.java util/NativeWorkerPool.java util/WorkerPool.java util/WorkerPoolFactory.java

Author: asankha
Date: Sun Apr 29 23:43:43 2007
New Revision: 533604

URL: http://svn.apache.org/viewvc?view=rev&rev=533604
Log:
introduce mechanism to swich between native java.util.concurrent in JDK 1.5 or above and the backport for older versions

Added:
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java
Modified:
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=533604&r1=533603&r2=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java Sun Apr 29 23:43:43 2007
@@ -27,10 +27,12 @@
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.protocol.*;
-import org.apache.axis2.util.threadpool.DefaultThreadFactory;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.transport.nhttp.util.PipeImpl;
+import org.apache.axis2.transport.nhttp.util.NativeWorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPoolFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axiom.soap.SOAP11Constants;
@@ -38,16 +40,10 @@
 
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.io.IOException;
 
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * The client connection handler. An instance of this class is used by each IOReactor, to
  * process every connection. Hence this class should not store any data related to a single
@@ -67,7 +63,7 @@
     /** the Axis2 configuration context */
     ConfigurationContext cfgCtx = null;
 
-    private Executor workerPool = null;
+    private WorkerPool workerPool = null;
 
     private static final String REQUEST_BUFFER = "request-buffer";
     private static final String RESPONSE_BUFFER = "response-buffer";
@@ -91,13 +87,12 @@
         this.connStrategy = new DefaultConnectionReuseStrategy();
 
         NHttpConfiguration cfg = NHttpConfiguration.getInstance();
-        workerPool = new ThreadPoolExecutor(
+        workerPool = WorkerPoolFactory.getWorkerPool(
             cfg.getClientCoreThreads(),
             cfg.getClientMaxThreads(),
-            cfg.getClientKeepalive(), TimeUnit.SECONDS,
-            cfg.getClientQueueLen() == -1 ?
-                new LinkedBlockingQueue() : new LinkedBlockingQueue(cfg.getServerQueueLen()),
-            new DefaultThreadFactory(new ThreadGroup("Client Worker thread group"), "HttpClientWorker"));
+            cfg.getClientKeepalive(),
+            cfg.getClientQueueLen(),
+            "Client Worker thread group", "HttpClientWorker");
     }
 
     public void requestReady(final NHttpClientConnection conn) {

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java?view=diff&rev=533604&r1=533603&r2=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java Sun Apr 29 23:43:43 2007
@@ -18,10 +18,11 @@
  */
 package org.apache.axis2.transport.nhttp;
 
-import edu.emory.mathcs.backport.java.util.concurrent.*;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.DefaultThreadFactory;
 import org.apache.axis2.transport.nhttp.util.PipeImpl;
+import org.apache.axis2.transport.nhttp.util.NativeWorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPoolFactory;
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.entity.ByteArrayEntity;
@@ -40,7 +41,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
 import java.nio.channels.WritableByteChannel;
 import java.nio.channels.ReadableByteChannel;
 
@@ -68,7 +68,7 @@
     private boolean isHttps = false;
 
     /** the thread pool to process requests */
-    private Executor workerPool = null;
+    private WorkerPool workerPool = null;
 
     private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
     private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
@@ -86,13 +86,12 @@
         this.connStrategy = new DefaultConnectionReuseStrategy();
 
         NHttpConfiguration cfg = NHttpConfiguration.getInstance();
-        this.workerPool = new ThreadPoolExecutor(
+        this.workerPool = WorkerPoolFactory.getWorkerPool(
             cfg.getServerCoreThreads(),
             cfg.getServerMaxThreads(),
-            cfg.getServerKeepalive(), TimeUnit.SECONDS,
-            cfg.getServerQueueLen() == -1 ?
-                new LinkedBlockingQueue() : new LinkedBlockingQueue(cfg.getServerQueueLen()),
-            new DefaultThreadFactory(new ThreadGroup("Server Worker thread group"), "HttpServerWorker"));
+            cfg.getServerKeepalive(),
+            cfg.getServerQueueLen(),
+            "Server Worker thread group", "HttpServerWorker");
     }
 
     /**

Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.*;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.*;
+
+
+/**
+ * Utility class to support the backport util.concurrent in JDK 1.4 and the
+ * native concurrent package in JDK 1.5 or later
+ */
+public class BackportWorkerPool implements WorkerPool{
+
+    private static final Log log = LogFactory.getLog(BackportWorkerPool.class);
+
+    java.util.concurrent.Executor nativeExecutor = null;
+    Executor executor = null;
+
+    public BackportWorkerPool(int core, int max, int keepAlive,
+        int queueLength, String threadGroupName, String threadGroupId) {
+
+        log.debug("Using backport of the util.concurrent package..");
+        executor = new ThreadPoolExecutor(
+            core, max, keepAlive,
+            TimeUnit.SECONDS,
+            queueLength == -1 ?
+                new LinkedBlockingQueue() :
+                new LinkedBlockingQueue(queueLength),
+            new BackportThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public void execute(Runnable task) {
+        executor.execute(task);
+    }
+
+    /**
+     * This is a simple ThreadFactory implementation using java.util.concurrent
+     * Creates threads with the given name prefix
+     */
+    public class BackportThreadFactory implements
+        ThreadFactory {
+
+        final ThreadGroup group;
+        final AtomicInteger count;
+        final String namePrefix;
+
+        public BackportThreadFactory(final ThreadGroup group, final String namePrefix) {
+            super();
+            this.count = new AtomicInteger(1);
+            this.group = group;
+            this.namePrefix = namePrefix;
+        }
+
+        public Thread newThread(final Runnable runnable) {
+            StringBuffer buffer = new StringBuffer();
+            buffer.append(this.namePrefix);
+            buffer.append('-');
+            buffer.append(this.count.getAndIncrement());
+            Thread t = new Thread(group, runnable, buffer.toString(), 0);
+            t.setDaemon(false);
+            t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+
+    }
+}

Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+
+/**
+ * Utility class to support the backport util.concurrent in JDK 1.4 and the
+ * native concurrent package in JDK 1.5 or later
+ */
+public class NativeWorkerPool implements WorkerPool {
+
+    private static final Log log = LogFactory.getLog(NativeWorkerPool.class);
+
+    java.util.concurrent.Executor nativeExecutor = null;
+    Executor executor = null;
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+        int queueLength, String threadGroupName, String threadGroupId) {
+
+        log.debug("Using native util.concurrent package..");
+        executor = new ThreadPoolExecutor(
+            core, max, keepAlive,
+            TimeUnit.SECONDS,
+            queueLength == -1 ?
+                new LinkedBlockingQueue() :
+                new LinkedBlockingQueue(queueLength),
+            new BackportThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public void execute(Runnable task) {
+        executor.execute(task);
+    }
+
+    /**
+     * This is a simple ThreadFactory implementation using java.util.concurrent
+     * Creates threads with the given name prefix
+     */
+    public class BackportThreadFactory implements
+        ThreadFactory {
+
+        final ThreadGroup group;
+        final AtomicInteger count;
+        final String namePrefix;
+
+        public BackportThreadFactory(final ThreadGroup group, final String namePrefix) {
+            super();
+            this.count = new AtomicInteger(1);
+            this.group = group;
+            this.namePrefix = namePrefix;
+        }
+
+        public Thread newThread(final Runnable runnable) {
+            StringBuffer buffer = new StringBuffer();
+            buffer.append(this.namePrefix);
+            buffer.append('-');
+            buffer.append(this.count.getAndIncrement());
+            Thread t = new Thread(group, runnable, buffer.toString(), 0);
+            t.setDaemon(false);
+            t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+
+    }
+}

Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,24 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+public interface WorkerPool {
+    public void execute(Runnable task);
+}

Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+/**
+ * Dynamically select util.concurrent implemenation
+ */
+public class WorkerPoolFactory {
+
+    public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+        int queueLength, String threadGroupName, String threadGroupId) {
+        try {
+            Class.forName("java.util.concurrent.ThreadPoolExecutor");
+            return new NativeWorkerPool(
+                core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
+        } catch (ClassNotFoundException e) {
+            return new BackportWorkerPool(
+                core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org