You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by su...@apache.org on 2011/01/17 09:04:19 UTC
svn commit: r1059789 - in /axis/axis2/java/transports/trunk/modules/base/src:
main/java/org/apache/axis2/transport/base/threads/
main/java/org/apache/axis2/transport/base/threads/watermark/
test/java/org/apache/axis2/transport/base/threads/ test/java/o...
Author: supun
Date: Mon Jan 17 08:04:18 2011
New Revision: 1059789
URL: http://svn.apache.org/viewvc?rev=1059789&view=rev
Log:
adding a ThreadPoolExecutor with a waterMark to control the threading behavior
Added:
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java
axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/
axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/
axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java
Modified:
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java
axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java
Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java?rev=1059789&r1=1059788&r2=1059789&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java Mon Jan 17 08:04:18 2011
@@ -19,11 +19,13 @@
package org.apache.axis2.transport.base.threads;
+import org.apache.axis2.transport.base.threads.watermark.DefaultWaterMarkQueue;
+import org.apache.axis2.transport.base.threads.watermark.WaterMarkExecutor;
+import org.apache.axis2.transport.base.threads.watermark.WaterMarkQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
@@ -33,7 +35,7 @@ public class NativeWorkerPool implements
static final Log log = LogFactory.getLog(NativeWorkerPool.class);
private final ThreadPoolExecutor executor;
- private final LinkedBlockingQueue<Runnable> blockingQueue;
+ private final BlockingQueue<Runnable> blockingQueue;
public NativeWorkerPool(int core, int max, int keepAlive,
int queueLength, String threadGroupName, String threadGroupId) {
@@ -45,10 +47,123 @@ public class NativeWorkerPool implements
(queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queueLength));
executor = new ThreadPoolExecutor(
- core, max, keepAlive,
- TimeUnit.SECONDS,
- blockingQueue,
- new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName,
+ String threadGroupId, BlockingQueue<Runnable> queue) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+
+ if (queue == null) {
+ blockingQueue =
+ (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+ : new LinkedBlockingQueue<Runnable>(queueLength));
+ } else {
+ blockingQueue = queue;
+ }
+
+ executor = new ThreadPoolExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName,
+ String threadGroupId, BlockingQueue<Runnable> queue,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+
+ if (queue == null) {
+ blockingQueue =
+ (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+ : new LinkedBlockingQueue<Runnable>(queueLength));
+ } else {
+ blockingQueue = queue;
+ }
+
+ executor = new ThreadPoolExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
+ rejectedExecutionHandler);
+ }
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, int waterMark, String threadGroupName,
+ String threadGroupId) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+
+
+ blockingQueue =
+ (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+ : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+
+ executor = new WaterMarkExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ (WaterMarkQueue<Runnable>) blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, int waterMark, String threadGroupName,
+ String threadGroupId, WaterMarkQueue<Runnable> queue) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+
+ if (queue == null) {
+ blockingQueue =
+ (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+ : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+ } else {
+ blockingQueue = queue;
+ }
+
+ executor = new WaterMarkExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ (WaterMarkQueue<Runnable>) blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, int waterMark, String threadGroupName,
+ String threadGroupId,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+
+
+ blockingQueue =
+ (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+ : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+
+ executor = new WaterMarkExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ (WaterMarkQueue<Runnable>) blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
+ rejectedExecutionHandler);
}
public void execute(final Runnable task) {
Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java?rev=1059789&r1=1059788&r2=1059789&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java Mon Jan 17 08:04:18 2011
@@ -19,6 +19,8 @@
package org.apache.axis2.transport.base.threads;
+import java.util.concurrent.BlockingQueue;
+
/**
* Worker pool factory.
* For the moment this always creates {@link NativeWorkerPool} instances since
@@ -27,8 +29,25 @@ package org.apache.axis2.transport.base.
public class WorkerPoolFactory {
public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
- int queueLength, String threadGroupName, String threadGroupId) {
+ int queueLength, String threadGroupName,
+ String threadGroupId) {
return new NativeWorkerPool(
core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
}
+
+ public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+ int queueLength, int waterMark, String threadGroupName,
+ String threadGroupId) {
+ return new NativeWorkerPool(core, max, keepAlive,
+ queueLength, waterMark, threadGroupName,
+ threadGroupId);
+ }
+
+ public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName,
+ String threadGroupId, BlockingQueue<Runnable> queue) {
+ return new NativeWorkerPool(core, max, keepAlive,
+ queueLength, threadGroupName,
+ threadGroupId, queue);
+ }
}
Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A Default implementation for WaterMarkQueue interface. The implementation uses an
+ * {@link ArrayBlockingQueue} up to water mark. Then it uses a {@link LinkedBlockingQueue} or
+ * ArrayBlocking queue from the water mark point. The LinkedBlockingQueue is used if a queue
+ * size is specified other than the waterMark.
+ *
+ * @param <T>
+ */
+public class DefaultWaterMarkQueue<T> implements WaterMarkQueue<T> {
+
+ private volatile ArrayBlockingQueue<T> waterMarkQueue;
+
+ private volatile Queue<T> afterWaterMarkQueue;
+
+ private Lock lock = new ReentrantLock();
+
+ /**
+ * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up
+ * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}.
+ * After this an {@link LinkedBlockingQueue} will be used without a bound.
+ *
+ * @param waterMark the waterMark of the queue
+ */
+ public DefaultWaterMarkQueue(int waterMark) {
+ afterWaterMarkQueue = new LinkedBlockingDeque<T>();
+
+ waterMarkQueue = new ArrayBlockingQueue<T>(waterMark);
+ }
+
+ /**
+ * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up
+ * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}.
+ * After this an {@link LinkedBlockingQueue} will be used with capacity
+ * <code>size - waterMark.</code>
+ *
+ * @param waterMark the waterMark of the queue
+ * @param size the size of the queue
+ */
+ public DefaultWaterMarkQueue(int waterMark, int size) {
+ if (waterMark <= size) {
+ afterWaterMarkQueue = new ArrayBlockingQueue<T>(size - waterMark);
+ } else {
+ throw new IllegalArgumentException("Size should be equal or greater than water mark");
+ }
+
+ waterMarkQueue = new ArrayBlockingQueue<T>(waterMark);
+ }
+
+ public boolean add(T t) {
+ return waterMarkQueue.add(t);
+
+ }
+
+ public boolean offer(T t) {
+ return waterMarkQueue.offer(t);
+ }
+
+ public T remove() {
+ T t = waterMarkQueue.remove();
+ tryMoveTasks();
+ return t;
+ }
+
+ public T poll() {
+ T t = waterMarkQueue.poll();
+ tryMoveTasks();
+ return t;
+ }
+
+ public T element() {
+ return waterMarkQueue.element();
+ }
+
+ public T peek() {
+ return waterMarkQueue.peek();
+ }
+
+ public void put(T t) throws InterruptedException {
+ waterMarkQueue.put(t);
+ }
+
+ public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
+ return waterMarkQueue.offer(t, l, timeUnit);
+ }
+
+ public T take() throws InterruptedException {
+ T t = waterMarkQueue.take();
+ tryMoveTasks();
+ return t;
+ }
+
+ public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
+ T t = waterMarkQueue.poll(l, timeUnit);
+ tryMoveTasks();
+ return t;
+ }
+
+ public int remainingCapacity() {
+ return waterMarkQueue.remainingCapacity();
+ }
+
+ public boolean remove(Object o) {
+ boolean b = waterMarkQueue.remove(o);
+ tryMoveTasks();
+ return b;
+ }
+
+ public boolean containsAll(Collection<?> objects) {
+ return waterMarkQueue.containsAll(objects);
+ }
+
+ public boolean addAll(Collection<? extends T> ts) {
+ return waterMarkQueue.addAll(ts);
+ }
+
+ public boolean removeAll(Collection<?> objects) {
+ boolean b = waterMarkQueue.removeAll(objects);
+ tryMoveTasks();
+
+ return b;
+ }
+
+ public boolean retainAll(Collection<?> objects) {
+ return waterMarkQueue.retainAll(objects);
+ }
+
+ public void clear() {
+ waterMarkQueue.clear();
+ afterWaterMarkQueue.clear();
+ }
+
+ public int size() {
+ return waterMarkQueue.size() + afterWaterMarkQueue.size();
+ }
+
+ public boolean isEmpty() {
+ tryMoveTasks();
+ return waterMarkQueue.isEmpty();
+ }
+
+ private void tryMoveTasks() {
+ if (afterWaterMarkQueue.size() > 0) {
+ lock.lock();
+ try {
+ while (afterWaterMarkQueue.size() > 0) {
+ T w = afterWaterMarkQueue.poll();
+ boolean offer = waterMarkQueue.offer(w);
+ if (!offer) {
+ afterWaterMarkQueue.offer(w);
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public boolean contains(Object o) {
+ return waterMarkQueue.contains(o) || afterWaterMarkQueue.contains(o);
+ }
+
+ public Iterator<T> iterator() {
+ return new IteratorImpl();
+ }
+
+ public Object[] toArray() {
+ return waterMarkQueue.toArray();
+ }
+
+ public <T> T[] toArray(T[] ts) {
+ T[] waterMarkArray = waterMarkQueue.toArray(ts);
+ T[] afterWaterMarkArray = afterWaterMarkQueue.toArray(ts);
+
+ final int alen = waterMarkArray.length;
+ final int blen = afterWaterMarkArray.length;
+ if (alen == 0) {
+ return afterWaterMarkArray;
+ }
+
+ if (blen == 0) {
+ return waterMarkArray;
+ }
+
+ final T[] result = (T[]) java.lang.reflect.Array.
+ newInstance(waterMarkArray.getClass().getComponentType(), alen + blen);
+ System.arraycopy(waterMarkArray, 0, result, 0, alen);
+ System.arraycopy(afterWaterMarkArray, 0, result, alen, blen);
+ return result;
+ }
+
+ public int drainTo(Collection<? super T> objects) {
+ int n = waterMarkQueue.drainTo(objects);
+ tryMoveTasks();
+
+ return n;
+ }
+
+ public int drainTo(Collection<? super T> objects, int i) {
+ int n = waterMarkQueue.drainTo(objects, i);
+ tryMoveTasks();
+ return n;
+ }
+
+ public boolean offerAfter(T t) {
+ lock.lock();
+ try {
+ return afterWaterMarkQueue.offer(t);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Iterator for DefaultWaterMarkQueue
+ */
+ private class IteratorImpl implements Iterator<T> {
+ Iterator<T> waterMarkIterator = null;
+
+ Iterator<T> afterWaterMarkIterator = null;
+
+ boolean waterMarkQueueDone = false;
+
+ private IteratorImpl() {
+ waterMarkIterator = waterMarkQueue.iterator();
+ afterWaterMarkIterator = afterWaterMarkQueue.iterator();
+
+ waterMarkQueueDone = false;
+ }
+
+ public boolean hasNext() {
+ return waterMarkIterator.hasNext() || afterWaterMarkIterator.hasNext();
+ }
+
+ public T next() {
+ lock.lock();
+ try {
+ if (waterMarkIterator.hasNext()) {
+ return waterMarkIterator.next();
+ } else {
+ waterMarkQueueDone = true;
+ return afterWaterMarkIterator.next();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void remove() {
+ if (!waterMarkQueueDone) {
+ waterMarkIterator.remove();
+ } else {
+ afterWaterMarkIterator.remove();
+ }
+ }
+ }
+}
Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.*;
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using
+ * one of possibly several pooled threads, but the execution happens differently
+ * from the {@link ThreadPoolExecutor}. In this executor after all the core pool threads
+ * are used queuing happens until the water mark. If the more tasks are submitted after
+ * the queue is filled up to the water mark the number of threads increases to max.
+ * If the number of tasks continue to increase the Queue begins to fill up. If the queue
+ * is a bounded queue and the queue is completely filled a {@link RejectedExecutionHandler}
+ * is executed if one specified. Otherwise the task is rejected.
+ */
+public class WaterMarkExecutor extends ThreadPoolExecutor {
+ public WaterMarkExecutor(int core, int max, long keepAlive,
+ TimeUnit timeUnit, WaterMarkQueue<Runnable> queue) {
+ super(core, max, keepAlive, timeUnit, queue, new WaterMarkRejectionHandler(null));
+ }
+
+ public WaterMarkExecutor(int core, int max, long keepAlive,
+ TimeUnit timeUnit, WaterMarkQueue<Runnable> queue,
+ ThreadFactory threadFactory) {
+ super(core, max, keepAlive,
+ timeUnit, queue, threadFactory, new WaterMarkRejectionHandler(null));
+ }
+
+ public WaterMarkExecutor(int core, int max,
+ long keepAlive, TimeUnit timeUnit,
+ WaterMarkQueue<Runnable> queue,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+
+ super(core, max, keepAlive, timeUnit,
+ queue, new WaterMarkRejectionHandler(rejectedExecutionHandler));
+ }
+
+ public WaterMarkExecutor(int core, int max, long keepAlive,
+ TimeUnit timeUnit, WaterMarkQueue<Runnable> queue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ super(core, max, keepAlive, timeUnit,
+ queue, threadFactory, new WaterMarkRejectionHandler(rejectedExecutionHandler));
+ }
+}
Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This queue acts as a queue with a mark. The methods exposed by the <code>BlockingQueue</code>
+ * interface will add elements up to the mark. We call this mark the waterMark. After the
+ * water mark the all the insertion operations will fails as if the queue is bounded by
+ * this waterMark. After this to add values to the queue the offerAfter method should be called.
+ *
+ * @param <T> The object
+ */
+public interface WaterMarkQueue<T> extends BlockingQueue<T> {
+ /**
+ * Offer the element after the water mark.
+ *
+ * @param object object to be inserted
+ * @return true if the insert is successful
+ */
+ public boolean offerAfter(T object);
+}
+
Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * This class implements the {@link RejectedExecutionHandler} and provide a mechanism for
+ * having the water mark in the {@link WaterMarkExecutor}. This is an internal class used by
+ * the {@link WaterMarkExecutor}.
+ */
+class WaterMarkRejectionHandler implements RejectedExecutionHandler {
+ RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
+
+ public WaterMarkRejectionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
+ if (rejectedExecutionHandler != null) {
+ this.rejectedExecutionHandler = rejectedExecutionHandler;
+ }
+ }
+
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
+ BlockingQueue q = threadPoolExecutor.getQueue();
+ if (q instanceof WaterMarkQueue) {
+ WaterMarkQueue wq = (WaterMarkQueue) q;
+
+ if (!wq.offerAfter(runnable)) {
+ if (rejectedExecutionHandler != null) {
+ rejectedExecutionHandler.rejectedExecution(runnable, threadPoolExecutor);
+ }
+ }
+ }
+ }
+}
Added: axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class WaterMarkExecutorTest extends TestCase {
+
+ private WaterMarkExecutor executor = null;
+
+ private WaterMarkExecutor executor2 = null;
+
+ private final int TASKS = 1000;
+
+ private volatile int runTasks = 0;
+
+ private volatile int[] tasksSubmitted = new int[TASKS];
+
+ private Lock lock = new ReentrantLock();
+
+ @Override
+ protected void setUp() throws Exception {
+ executor = new WaterMarkExecutor(10, 100, 10,
+ TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100, 500),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ executor2 = new WaterMarkExecutor(10, 100, 10,
+ TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ public void testExecutor() {
+ for (int i = 0; i < TASKS; i++) {
+ tasksSubmitted[i] = i + 1;
+ }
+
+ for (int i = 0; i < TASKS; i++) {
+ executor.execute(new Test(i + 1, lock));
+ }
+
+ // this is an best effort number so we wait another 1 second for
+ // the executor to finish the tasks
+ while (executor.getActiveCount() > 0) {}
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+
+ int tasks = 0;
+ for (int aTasksSubmitted : tasksSubmitted) {
+ if (aTasksSubmitted != 0) {
+ tasks++;
+ }
+ }
+
+ assertEquals(TASKS, runTasks);
+ assertEquals(tasks, 0);
+
+ executor.shutdown();
+
+ }
+
+ public void testExecutor2() {
+ for (int i = 0; i < TASKS; i++) {
+ tasksSubmitted[i] = i + 1;
+ }
+
+ for (int i = 0; i < TASKS; i++) {
+ executor2.execute(new Test(i + 1, lock));
+ }
+
+ // this is an best effort number so we wait another 1 second for
+ // the executor to finish the tasks
+ while (executor2.getActiveCount() > 0) {}
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+
+ int tasks = 0;
+ for (int aTasksSubmitted : tasksSubmitted) {
+ if (aTasksSubmitted != 0) {
+ tasks++;
+ }
+ }
+
+ assertEquals(TASKS, runTasks);
+ assertEquals(tasks, 0);
+
+ executor2.shutdown();
+
+ }
+
+ private class Test implements Runnable {
+ long taskId;
+ Lock tLock;
+
+ private Test(long taskId, Lock lock) {
+ this.taskId = taskId;
+ tLock = lock;
+ }
+
+ public void run() {
+ tLock.lock();
+ try {
+ runTasks++;
+ for (int i = 0; i < TASKS; i++) {
+ if (taskId == tasksSubmitted[i]) {
+ tasksSubmitted[i] = 0;
+ }
+ }
+ } finally {
+ tLock.unlock();
+ }
+
+ }
+ }
+}