You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/04/30 08:43:44 UTC
svn commit: r533604 - in
/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp:
ClientHandler.java ServerHandler.java util/BackportWorkerPool.java
util/NativeWorkerPool.java util/WorkerPool.java util/WorkerPoolFactory.java
Author: asankha
Date: Sun Apr 29 23:43:43 2007
New Revision: 533604
URL: http://svn.apache.org/viewvc?view=rev&rev=533604
Log:
introduce mechanism to swich between native java.util.concurrent in JDK 1.5 or above and the backport for older versions
Added:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=533604&r1=533603&r2=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java Sun Apr 29 23:43:43 2007
@@ -27,10 +27,12 @@
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.protocol.*;
-import org.apache.axis2.util.threadpool.DefaultThreadFactory;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.transport.nhttp.util.PipeImpl;
+import org.apache.axis2.transport.nhttp.util.NativeWorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPoolFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axiom.soap.SOAP11Constants;
@@ -38,16 +40,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.io.IOException;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-
/**
* The client connection handler. An instance of this class is used by each IOReactor, to
* process every connection. Hence this class should not store any data related to a single
@@ -67,7 +63,7 @@
/** the Axis2 configuration context */
ConfigurationContext cfgCtx = null;
- private Executor workerPool = null;
+ private WorkerPool workerPool = null;
private static final String REQUEST_BUFFER = "request-buffer";
private static final String RESPONSE_BUFFER = "response-buffer";
@@ -91,13 +87,12 @@
this.connStrategy = new DefaultConnectionReuseStrategy();
NHttpConfiguration cfg = NHttpConfiguration.getInstance();
- workerPool = new ThreadPoolExecutor(
+ workerPool = WorkerPoolFactory.getWorkerPool(
cfg.getClientCoreThreads(),
cfg.getClientMaxThreads(),
- cfg.getClientKeepalive(), TimeUnit.SECONDS,
- cfg.getClientQueueLen() == -1 ?
- new LinkedBlockingQueue() : new LinkedBlockingQueue(cfg.getServerQueueLen()),
- new DefaultThreadFactory(new ThreadGroup("Client Worker thread group"), "HttpClientWorker"));
+ cfg.getClientKeepalive(),
+ cfg.getClientQueueLen(),
+ "Client Worker thread group", "HttpClientWorker");
}
public void requestReady(final NHttpClientConnection conn) {
Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java?view=diff&rev=533604&r1=533603&r2=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java Sun Apr 29 23:43:43 2007
@@ -18,10 +18,11 @@
*/
package org.apache.axis2.transport.nhttp;
-import edu.emory.mathcs.backport.java.util.concurrent.*;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.DefaultThreadFactory;
import org.apache.axis2.transport.nhttp.util.PipeImpl;
+import org.apache.axis2.transport.nhttp.util.NativeWorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPool;
+import org.apache.axis2.transport.nhttp.util.WorkerPoolFactory;
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
@@ -40,7 +41,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.ReadableByteChannel;
@@ -68,7 +68,7 @@
private boolean isHttps = false;
/** the thread pool to process requests */
- private Executor workerPool = null;
+ private WorkerPool workerPool = null;
private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
@@ -86,13 +86,12 @@
this.connStrategy = new DefaultConnectionReuseStrategy();
NHttpConfiguration cfg = NHttpConfiguration.getInstance();
- this.workerPool = new ThreadPoolExecutor(
+ this.workerPool = WorkerPoolFactory.getWorkerPool(
cfg.getServerCoreThreads(),
cfg.getServerMaxThreads(),
- cfg.getServerKeepalive(), TimeUnit.SECONDS,
- cfg.getServerQueueLen() == -1 ?
- new LinkedBlockingQueue() : new LinkedBlockingQueue(cfg.getServerQueueLen()),
- new DefaultThreadFactory(new ThreadGroup("Server Worker thread group"), "HttpServerWorker"));
+ cfg.getServerKeepalive(),
+ cfg.getServerQueueLen(),
+ "Server Worker thread group", "HttpServerWorker");
}
/**
Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/BackportWorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.*;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.*;
+
+
+/**
+ * Utility class to support the backport util.concurrent in JDK 1.4 and the
+ * native concurrent package in JDK 1.5 or later
+ */
+public class BackportWorkerPool implements WorkerPool{
+
+ private static final Log log = LogFactory.getLog(BackportWorkerPool.class);
+
+ java.util.concurrent.Executor nativeExecutor = null;
+ Executor executor = null;
+
+ public BackportWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName, String threadGroupId) {
+
+ log.debug("Using backport of the util.concurrent package..");
+ executor = new ThreadPoolExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ queueLength == -1 ?
+ new LinkedBlockingQueue() :
+ new LinkedBlockingQueue(queueLength),
+ new BackportThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public void execute(Runnable task) {
+ executor.execute(task);
+ }
+
+ /**
+ * This is a simple ThreadFactory implementation using java.util.concurrent
+ * Creates threads with the given name prefix
+ */
+ public class BackportThreadFactory implements
+ ThreadFactory {
+
+ final ThreadGroup group;
+ final AtomicInteger count;
+ final String namePrefix;
+
+ public BackportThreadFactory(final ThreadGroup group, final String namePrefix) {
+ super();
+ this.count = new AtomicInteger(1);
+ this.group = group;
+ this.namePrefix = namePrefix;
+ }
+
+ public Thread newThread(final Runnable runnable) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(this.namePrefix);
+ buffer.append('-');
+ buffer.append(this.count.getAndIncrement());
+ Thread t = new Thread(group, runnable, buffer.toString(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+
+ }
+}
Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/NativeWorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+
+/**
+ * Utility class to support the backport util.concurrent in JDK 1.4 and the
+ * native concurrent package in JDK 1.5 or later
+ */
+public class NativeWorkerPool implements WorkerPool {
+
+ private static final Log log = LogFactory.getLog(NativeWorkerPool.class);
+
+ java.util.concurrent.Executor nativeExecutor = null;
+ Executor executor = null;
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName, String threadGroupId) {
+
+ log.debug("Using native util.concurrent package..");
+ executor = new ThreadPoolExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ queueLength == -1 ?
+ new LinkedBlockingQueue() :
+ new LinkedBlockingQueue(queueLength),
+ new BackportThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public void execute(Runnable task) {
+ executor.execute(task);
+ }
+
+ /**
+ * This is a simple ThreadFactory implementation using java.util.concurrent
+ * Creates threads with the given name prefix
+ */
+ public class BackportThreadFactory implements
+ ThreadFactory {
+
+ final ThreadGroup group;
+ final AtomicInteger count;
+ final String namePrefix;
+
+ public BackportThreadFactory(final ThreadGroup group, final String namePrefix) {
+ super();
+ this.count = new AtomicInteger(1);
+ this.group = group;
+ this.namePrefix = namePrefix;
+ }
+
+ public Thread newThread(final Runnable runnable) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(this.namePrefix);
+ buffer.append('-');
+ buffer.append(this.count.getAndIncrement());
+ Thread t = new Thread(group, runnable, buffer.toString(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+
+ }
+}
Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPool.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+public interface WorkerPool {
+ public void execute(Runnable task);
+}
Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java?view=auto&rev=533604
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/WorkerPoolFactory.java Sun Apr 29 23:43:43 2007
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+/**
+ * Dynamically select util.concurrent implemenation
+ */
+public class WorkerPoolFactory {
+
+ public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName, String threadGroupId) {
+ try {
+ Class.forName("java.util.concurrent.ThreadPoolExecutor");
+ return new NativeWorkerPool(
+ core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
+ } catch (ClassNotFoundException e) {
+ return new BackportWorkerPool(
+ core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org