You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by su...@apache.org on 2011/01/17 09:04:19 UTC

svn commit: r1059789 - in /axis/axis2/java/transports/trunk/modules/base/src: main/java/org/apache/axis2/transport/base/threads/ main/java/org/apache/axis2/transport/base/threads/watermark/ test/java/org/apache/axis2/transport/base/threads/ test/java/o...

Author: supun
Date: Mon Jan 17 08:04:18 2011
New Revision: 1059789

URL: http://svn.apache.org/viewvc?rev=1059789&view=rev
Log:
adding a ThreadPoolExecutor with a waterMark to control the threading behavior

Added:
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java
    axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/
    axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/
    axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java
Modified:
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java

Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java?rev=1059789&r1=1059788&r2=1059789&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java Mon Jan 17 08:04:18 2011
@@ -19,11 +19,13 @@
 
 package org.apache.axis2.transport.base.threads;
 
+import org.apache.axis2.transport.base.threads.watermark.DefaultWaterMarkQueue;
+import org.apache.axis2.transport.base.threads.watermark.WaterMarkExecutor;
+import org.apache.axis2.transport.base.threads.watermark.WaterMarkQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
@@ -33,7 +35,7 @@ public class NativeWorkerPool implements
     static final Log log = LogFactory.getLog(NativeWorkerPool.class);
 
     private final ThreadPoolExecutor executor;
-    private final LinkedBlockingQueue<Runnable> blockingQueue;
+    private final BlockingQueue<Runnable> blockingQueue;
 
     public NativeWorkerPool(int core, int max, int keepAlive,
         int queueLength, String threadGroupName, String threadGroupId) {
@@ -45,10 +47,123 @@ public class NativeWorkerPool implements
             (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queueLength));
         executor = new ThreadPoolExecutor(
-            core, max, keepAlive,
-            TimeUnit.SECONDS,
-            blockingQueue,
-            new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+                            int queueLength, String threadGroupName,
+                            String threadGroupId, BlockingQueue<Runnable> queue) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using native util.concurrent package..");
+        }
+
+        if (queue == null) {
+            blockingQueue =
+                    (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+                            : new LinkedBlockingQueue<Runnable>(queueLength));
+        } else {
+            blockingQueue = queue;
+        }
+
+        executor = new ThreadPoolExecutor(
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+                            int queueLength, String threadGroupName,
+                            String threadGroupId, BlockingQueue<Runnable> queue,
+                            RejectedExecutionHandler rejectedExecutionHandler) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using native util.concurrent package..");
+        }
+
+        if (queue == null) {
+            blockingQueue =
+                    (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+                            : new LinkedBlockingQueue<Runnable>(queueLength));
+        } else {
+            blockingQueue = queue;
+        }
+
+        executor = new ThreadPoolExecutor(
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
+                rejectedExecutionHandler);
+    }
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+                            int queueLength, int waterMark, String threadGroupName,
+                            String threadGroupId) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using native util.concurrent package..");
+        }
+
+
+        blockingQueue =
+                (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+                        : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+
+        executor = new WaterMarkExecutor(
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                (WaterMarkQueue<Runnable>) blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+                            int queueLength, int waterMark, String threadGroupName,
+                            String threadGroupId, WaterMarkQueue<Runnable> queue) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using native util.concurrent package..");
+        }
+
+        if (queue == null) {
+            blockingQueue =
+                    (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+                            : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+        } else {
+            blockingQueue = queue;
+        }
+
+        executor = new WaterMarkExecutor(
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                (WaterMarkQueue<Runnable>) blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+    }
+
+    public NativeWorkerPool(int core, int max, int keepAlive,
+                            int queueLength, int waterMark, String threadGroupName,
+                            String threadGroupId,
+                            RejectedExecutionHandler rejectedExecutionHandler) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using native util.concurrent package..");
+        }
+
+
+        blockingQueue =
+                (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark)
+                        : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength));
+
+        executor = new WaterMarkExecutor(
+                core, max, keepAlive,
+                TimeUnit.SECONDS,
+                (WaterMarkQueue<Runnable>) blockingQueue,
+                new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId),
+                rejectedExecutionHandler);
     }
 
     public void execute(final Runnable task) {

Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java?rev=1059789&r1=1059788&r2=1059789&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java Mon Jan 17 08:04:18 2011
@@ -19,6 +19,8 @@
 
 package org.apache.axis2.transport.base.threads;
 
+import java.util.concurrent.BlockingQueue;
+
 /**
  * Worker pool factory.
  * For the moment this always creates {@link NativeWorkerPool} instances since
@@ -27,8 +29,25 @@ package org.apache.axis2.transport.base.
 public class WorkerPoolFactory {
 
     public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
-        int queueLength, String threadGroupName, String threadGroupId) {
+                                           int queueLength, String threadGroupName,
+                                           String threadGroupId) {
             return new NativeWorkerPool(
                 core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
     }
+
+    public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+                                           int queueLength, int waterMark, String threadGroupName,
+                                           String threadGroupId) {
+        return new NativeWorkerPool(core, max, keepAlive,
+                queueLength, waterMark, threadGroupName,
+                threadGroupId);
+    }
+
+    public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+                                           int queueLength, String threadGroupName,
+                                           String threadGroupId, BlockingQueue<Runnable> queue) {
+        return new NativeWorkerPool(core, max, keepAlive,
+                queueLength, threadGroupName,
+                threadGroupId, queue);
+    }
 }

Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,285 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A Default implementation for WaterMarkQueue interface. The implementation uses an
+ * {@link ArrayBlockingQueue} up to water mark. Then it uses a {@link LinkedBlockingQueue} or
+ * ArrayBlocking queue from the water mark point. The LinkedBlockingQueue is used if a queue
+ * size is specified other than the waterMark.
+ *
+ * @param <T>
+ */
+public class DefaultWaterMarkQueue<T> implements WaterMarkQueue<T> {
+
+    private volatile ArrayBlockingQueue<T> waterMarkQueue;
+
+    private volatile Queue<T> afterWaterMarkQueue;
+
+    private Lock lock = new ReentrantLock();
+
+    /**
+     * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up
+     * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}.
+     * After this an {@link LinkedBlockingQueue} will be used without a bound.
+     *
+     * @param waterMark the waterMark of the queue
+     */
+    public DefaultWaterMarkQueue(int waterMark) {
+        afterWaterMarkQueue = new LinkedBlockingDeque<T>();
+
+        waterMarkQueue = new ArrayBlockingQueue<T>(waterMark);
+    }
+
+    /**
+     * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up
+     * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}.
+     * After this an {@link LinkedBlockingQueue} will be used with capacity
+     * <code>size - waterMark.</code>
+     *
+     * @param waterMark the waterMark of the queue
+     * @param size the size of the queue
+     */
+    public DefaultWaterMarkQueue(int waterMark, int size) {
+        if (waterMark <= size) {
+            afterWaterMarkQueue = new ArrayBlockingQueue<T>(size - waterMark);
+        } else {
+            throw new IllegalArgumentException("Size should be equal or greater than water mark");
+        }
+
+        waterMarkQueue = new ArrayBlockingQueue<T>(waterMark);
+    }
+
+    public boolean add(T t) {
+        return waterMarkQueue.add(t);
+
+    }
+
+    public boolean offer(T t) {
+        return waterMarkQueue.offer(t);
+    }
+
+    public T remove() {
+        T t = waterMarkQueue.remove();
+        tryMoveTasks();
+        return t;
+    }
+
+    public T poll() {
+        T t = waterMarkQueue.poll();
+        tryMoveTasks();
+        return t;
+    }
+
+    public T element() {
+        return waterMarkQueue.element();
+    }
+
+    public T peek() {
+        return waterMarkQueue.peek();
+    }
+
+    public void put(T t) throws InterruptedException {
+        waterMarkQueue.put(t);
+    }
+
+    public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
+        return waterMarkQueue.offer(t, l, timeUnit);
+    }
+
+    public T take() throws InterruptedException {
+        T t = waterMarkQueue.take();
+        tryMoveTasks();
+        return t;
+    }
+
+    public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
+        T t = waterMarkQueue.poll(l, timeUnit);
+        tryMoveTasks();
+        return t;
+    }
+
+    public int remainingCapacity() {
+        return waterMarkQueue.remainingCapacity();
+    }
+
+    public boolean remove(Object o) {
+        boolean b = waterMarkQueue.remove(o);
+        tryMoveTasks();
+        return b;
+    }
+
+    public boolean containsAll(Collection<?> objects) {
+        return waterMarkQueue.containsAll(objects);
+    }
+
+    public boolean addAll(Collection<? extends T> ts) {
+        return waterMarkQueue.addAll(ts);
+    }
+
+    public boolean removeAll(Collection<?> objects) {
+        boolean b = waterMarkQueue.removeAll(objects);
+        tryMoveTasks();
+
+        return b;
+    }
+
+    public boolean retainAll(Collection<?> objects) {
+        return waterMarkQueue.retainAll(objects);
+    }
+
+    public void clear() {
+        waterMarkQueue.clear();
+        afterWaterMarkQueue.clear();
+    }
+
+    public int size() {
+        return waterMarkQueue.size() + afterWaterMarkQueue.size();
+    }
+
+    public boolean isEmpty() {
+        tryMoveTasks();
+        return waterMarkQueue.isEmpty();
+    }
+
+    private void tryMoveTasks() {
+        if (afterWaterMarkQueue.size() > 0) {
+            lock.lock();
+            try {
+                while (afterWaterMarkQueue.size() > 0) {
+                    T w = afterWaterMarkQueue.poll();
+                    boolean offer = waterMarkQueue.offer(w);
+                    if (!offer) {
+                        afterWaterMarkQueue.offer(w);
+                        break;
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    public boolean contains(Object o) {
+        return waterMarkQueue.contains(o) || afterWaterMarkQueue.contains(o);
+    }
+
+    public Iterator<T> iterator() {
+        return new IteratorImpl();
+    }
+
+    public Object[] toArray() {
+        return waterMarkQueue.toArray();
+    }
+
+    public <T> T[] toArray(T[] ts) {
+        T[] waterMarkArray = waterMarkQueue.toArray(ts);
+        T[] afterWaterMarkArray = afterWaterMarkQueue.toArray(ts);
+
+        final int alen = waterMarkArray.length;
+        final int blen = afterWaterMarkArray.length;
+        if (alen == 0) {
+            return afterWaterMarkArray;
+        }
+
+        if (blen == 0) {
+            return waterMarkArray;
+        }
+
+        final T[] result = (T[]) java.lang.reflect.Array.
+                newInstance(waterMarkArray.getClass().getComponentType(), alen + blen);
+        System.arraycopy(waterMarkArray, 0, result, 0, alen);
+        System.arraycopy(afterWaterMarkArray, 0, result, alen, blen);
+        return result;
+    }
+
+    public int drainTo(Collection<? super T> objects) {
+        int n = waterMarkQueue.drainTo(objects);
+        tryMoveTasks();
+
+        return n;
+    }
+
+    public int drainTo(Collection<? super T> objects, int i) {
+        int n = waterMarkQueue.drainTo(objects, i);
+        tryMoveTasks();
+        return n;
+    }
+
+    public boolean offerAfter(T t) {
+        lock.lock();
+        try {
+            return afterWaterMarkQueue.offer(t);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Iterator for DefaultWaterMarkQueue
+     */
+    private class IteratorImpl implements Iterator<T> {
+        Iterator<T> waterMarkIterator = null;
+
+        Iterator<T> afterWaterMarkIterator = null;
+
+        boolean waterMarkQueueDone = false;
+
+        private IteratorImpl() {
+            waterMarkIterator = waterMarkQueue.iterator();
+            afterWaterMarkIterator = afterWaterMarkQueue.iterator();
+
+            waterMarkQueueDone = false;
+        }
+
+        public boolean hasNext() {
+            return waterMarkIterator.hasNext() || afterWaterMarkIterator.hasNext();
+        }
+
+        public T next() {
+            lock.lock();
+            try {
+                if (waterMarkIterator.hasNext()) {
+                    return waterMarkIterator.next();
+                } else {
+                    waterMarkQueueDone = true;
+                    return afterWaterMarkIterator.next();
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public void remove() {
+            if (!waterMarkQueueDone) {
+                waterMarkIterator.remove();
+            } else {
+                afterWaterMarkIterator.remove();
+            }
+        }
+    }
+}

Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.*;
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using
+ * one of possibly several pooled threads, but the execution happens differently
+ * from the {@link ThreadPoolExecutor}. In this executor after all the core pool threads
+ * are used queuing happens until the water mark. If the more tasks are submitted after
+ * the queue is filled up to the water mark the number of threads increases to max.
+ * If the number of tasks continue to increase the Queue begins to fill up. If the queue
+ * is a bounded queue and the queue is completely filled a {@link RejectedExecutionHandler}
+ * is executed if one specified. Otherwise the task is rejected.
+ */
+public class WaterMarkExecutor extends ThreadPoolExecutor {
+    public WaterMarkExecutor(int core, int max, long keepAlive,
+                             TimeUnit timeUnit, WaterMarkQueue<Runnable> queue) {
+        super(core, max, keepAlive, timeUnit, queue, new WaterMarkRejectionHandler(null));
+    }
+
+    public WaterMarkExecutor(int core, int max, long keepAlive,
+                             TimeUnit timeUnit, WaterMarkQueue<Runnable> queue,
+                             ThreadFactory threadFactory) {
+        super(core, max, keepAlive,
+                timeUnit, queue, threadFactory, new WaterMarkRejectionHandler(null));
+    }
+
+    public WaterMarkExecutor(int core, int max,
+                             long keepAlive, TimeUnit timeUnit,
+                             WaterMarkQueue<Runnable> queue,
+                             RejectedExecutionHandler rejectedExecutionHandler) {
+
+        super(core, max, keepAlive, timeUnit,
+                queue, new WaterMarkRejectionHandler(rejectedExecutionHandler));
+    }
+
+    public WaterMarkExecutor(int core, int max, long keepAlive,
+                             TimeUnit timeUnit, WaterMarkQueue<Runnable> queue,
+                             ThreadFactory threadFactory,
+                             RejectedExecutionHandler rejectedExecutionHandler) {
+        super(core, max, keepAlive, timeUnit,
+                queue, threadFactory, new WaterMarkRejectionHandler(rejectedExecutionHandler));
+    }
+}

Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This queue acts as a queue with a mark. The methods exposed by the <code>BlockingQueue</code>
+ * interface will add elements up to the mark. We call this mark the waterMark. After the
+ * water mark the all the insertion operations will fails as if the queue is bounded by
+ * this waterMark. After this to add values to the queue the offerAfter method should be called.
+ *
+ * @param <T> The object
+ */
+public interface WaterMarkQueue<T> extends BlockingQueue<T> {
+    /**
+     * Offer the element after the water mark.
+     *
+     * @param object object to be inserted
+     * @return true if the insert is successful
+     */
+    public boolean offerAfter(T object);
+}
+

Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * This class implements the {@link RejectedExecutionHandler} and provide a mechanism for
+ * having the water mark in the {@link WaterMarkExecutor}. This is an internal class used by
+ * the {@link WaterMarkExecutor}.
+ */
+class WaterMarkRejectionHandler implements RejectedExecutionHandler {
+    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
+
+    public WaterMarkRejectionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
+        if (rejectedExecutionHandler != null) {
+            this.rejectedExecutionHandler = rejectedExecutionHandler;
+        }
+    }
+
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
+        BlockingQueue q = threadPoolExecutor.getQueue();
+        if (q instanceof WaterMarkQueue) {
+            WaterMarkQueue wq = (WaterMarkQueue) q;
+
+            if (!wq.offerAfter(runnable)) {
+                if (rejectedExecutionHandler != null) {
+                    rejectedExecutionHandler.rejectedExecution(runnable, threadPoolExecutor);
+                }
+            }
+        }
+    }
+}

Added: axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java?rev=1059789&view=auto
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java (added)
+++ axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java Mon Jan 17 08:04:18 2011
@@ -0,0 +1,142 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.base.threads.watermark;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class WaterMarkExecutorTest extends TestCase {
+
+    private WaterMarkExecutor executor = null;
+
+    private WaterMarkExecutor executor2 = null;
+
+    private final int TASKS = 1000;
+
+    private volatile int runTasks = 0;
+
+    private volatile int[] tasksSubmitted = new int[TASKS];
+
+    private Lock lock = new ReentrantLock();
+
+    @Override
+    protected void setUp() throws Exception {
+        executor = new WaterMarkExecutor(10, 100, 10,
+            TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100, 500),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+        executor2 = new WaterMarkExecutor(10, 100, 10,
+            TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public void testExecutor() {
+        for (int i = 0; i < TASKS; i++) {
+            tasksSubmitted[i] = i + 1;
+        }
+
+        for (int i = 0; i < TASKS; i++) {
+            executor.execute(new Test(i + 1, lock));
+        }
+
+        // this is an best effort number so we wait another 1 second for
+        // the executor to finish the tasks
+        while (executor.getActiveCount() > 0) {}
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+
+        }
+
+        int tasks = 0;
+        for (int aTasksSubmitted : tasksSubmitted) {
+            if (aTasksSubmitted != 0) {
+                tasks++;
+            }
+        }
+
+        assertEquals(TASKS, runTasks);
+        assertEquals(tasks, 0);
+
+        executor.shutdown();
+
+    }
+
+    public void testExecutor2() {
+        for (int i = 0; i < TASKS; i++) {
+            tasksSubmitted[i] = i + 1;
+        }
+
+        for (int i = 0; i < TASKS; i++) {
+            executor2.execute(new Test(i + 1, lock));
+        }
+
+        // this is an best effort number so we wait another 1 second for
+        // the executor to finish the tasks
+        while (executor2.getActiveCount() > 0) {}
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+
+        }
+
+        int tasks = 0;
+        for (int aTasksSubmitted : tasksSubmitted) {
+            if (aTasksSubmitted != 0) {
+                tasks++;
+            }
+        }
+
+        assertEquals(TASKS, runTasks);
+        assertEquals(tasks, 0);
+
+        executor2.shutdown();
+
+    }
+
+    private class Test implements Runnable {
+        long taskId;
+        Lock tLock;
+
+        private Test(long taskId, Lock lock) {
+            this.taskId = taskId;
+            tLock = lock;
+        }
+
+        public void run() {
+                tLock.lock();
+                try {
+                    runTasks++;
+                    for (int i = 0; i < TASKS; i++) {
+                        if (taskId == tasksSubmitted[i]) {
+                            tasksSubmitted[i] = 0;
+                        }
+                    }
+                } finally {
+                    tLock.unlock();
+                }
+
+        }
+    }
+}