You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 05:18:02 UTC

svn commit: r467222 [25/31] - in /tomcat/tc6.0.x/trunk/java: javax/annotation/ javax/annotation/security/ javax/ejb/ javax/el/ javax/mail/ javax/mail/internet/ javax/persistence/ javax/servlet/ javax/servlet/http/ javax/servlet/jsp/ javax/servlet/jsp/e...

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java Mon Oct 23 20:17:11 2006
@@ -1,165 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * @author not attributable
- * @version 1.0
- */
-
-public class ThreadPool
-{
-    /**
-     * A very simple thread pool class.  The pool size is set at
-     * construction time and remains fixed.  Threads are cycled
-     * through a FIFO idle queue.
-     */
-
-    List idle = new LinkedList();
-    List used = new LinkedList();
-    
-    Object mutex = new Object();
-    boolean running = true;
-    
-    private static int counter = 1;
-    private int maxThreads;
-    private int minThreads;
-    
-    private ThreadCreator creator = null;
-
-    private static synchronized int inc() {
-        return counter++;
-    }
-
-    
-    public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
-        // fill up the pool with worker threads
-        this.maxThreads = maxThreads;
-        this.minThreads = minThreads;
-        this.creator = creator;
-        //for (int i = 0; i < minThreads; i++) {
-        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
-            WorkerThread thread = creator.getWorkerThread();
-            setupThread(thread);
-            idle.add (thread);
-        }
-    }
-    
-    protected void setupThread(WorkerThread thread) {
-        synchronized (thread) {
-            thread.setPool(this);
-            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
-            thread.setDaemon(true);
-            thread.setPriority(Thread.MAX_PRIORITY);
-            thread.start();
-            try {thread.wait(500); }catch ( InterruptedException x ) {}
-        }
-    }
-
-    /**
-     * Find an idle worker thread, if any.  Could return null.
-     */
-    public WorkerThread getWorker()
-    {
-        WorkerThread worker = null;
-        synchronized (mutex) {
-            while ( worker == null && running ) {
-                if (idle.size() > 0) {
-                    try {
-                        worker = (WorkerThread) idle.remove(0);
-                    } catch (java.util.NoSuchElementException x) {
-                        //this means that there are no available workers
-                        worker = null;
-                    }
-                } else if ( used.size() < this.maxThreads && creator != null) {
-                    worker = creator.getWorkerThread();
-                    setupThread(worker);
-                } else {
-                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
-                }
-            }//while
-            if ( worker != null ) used.add(worker);
-        }
-        return (worker);
-    }
-    
-    public int available() {
-        return idle.size();
-    }
-
-    /**
-     * Called by the worker thread to return itself to the
-     * idle pool.
-     */
-    public void returnWorker (WorkerThread worker) {
-        if ( running ) {
-            synchronized (mutex) {
-                used.remove(worker);
-                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
-                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
-                else {
-                    worker.setDoRun(false);
-                    synchronized (worker){worker.notify();}
-                }
-                mutex.notify();
-            }
-        }else {
-            worker.setDoRun(false);
-            synchronized (worker){worker.notify();}
-        }
-    }
-
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    public int getMinThreads() {
-        return minThreads;
-    }
-
-    public void stop() {
-        running = false;
-        synchronized (mutex) {
-            Iterator i = idle.iterator();
-            while ( i.hasNext() ) {
-                WorkerThread worker = (WorkerThread)i.next();
-                returnWorker(worker);
-                i.remove();
-            }
-        }
-    }
-
-    public void setMaxThreads(int maxThreads) {
-        this.maxThreads = maxThreads;
-    }
-
-    public void setMinThreads(int minThreads) {
-        this.minThreads = minThreads;
-    }
-
-    public ThreadCreator getThreadCreator() {
-        return this.creator;
-    }
-    
-    public static interface ThreadCreator {
-        public WorkerThread getWorkerThread();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author not attributable
+ * @version 1.0
+ */
+
+public class ThreadPool
+{
+    /**
+     * A very simple thread pool class.  The pool size is set at
+     * construction time and remains fixed.  Threads are cycled
+     * through a FIFO idle queue.
+     */
+
+    List idle = new LinkedList();
+    List used = new LinkedList();
+    
+    Object mutex = new Object();
+    boolean running = true;
+    
+    private static int counter = 1;
+    private int maxThreads;
+    private int minThreads;
+    
+    private ThreadCreator creator = null;
+
+    private static synchronized int inc() {
+        return counter++;
+    }
+
+    
+    public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
+        // fill up the pool with worker threads
+        this.maxThreads = maxThreads;
+        this.minThreads = minThreads;
+        this.creator = creator;
+        //for (int i = 0; i < minThreads; i++) {
+        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
+            WorkerThread thread = creator.getWorkerThread();
+            setupThread(thread);
+            idle.add (thread);
+        }
+    }
+    
+    protected void setupThread(WorkerThread thread) {
+        synchronized (thread) {
+            thread.setPool(this);
+            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
+            thread.setDaemon(true);
+            thread.setPriority(Thread.MAX_PRIORITY);
+            thread.start();
+            try {thread.wait(500); }catch ( InterruptedException x ) {}
+        }
+    }
+
+    /**
+     * Find an idle worker thread, if any.  Could return null.
+     */
+    public WorkerThread getWorker()
+    {
+        WorkerThread worker = null;
+        synchronized (mutex) {
+            while ( worker == null && running ) {
+                if (idle.size() > 0) {
+                    try {
+                        worker = (WorkerThread) idle.remove(0);
+                    } catch (java.util.NoSuchElementException x) {
+                        //this means that there are no available workers
+                        worker = null;
+                    }
+                } else if ( used.size() < this.maxThreads && creator != null) {
+                    worker = creator.getWorkerThread();
+                    setupThread(worker);
+                } else {
+                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
+                }
+            }//while
+            if ( worker != null ) used.add(worker);
+        }
+        return (worker);
+    }
+    
+    public int available() {
+        return idle.size();
+    }
+
+    /**
+     * Called by the worker thread to return itself to the
+     * idle pool.
+     */
+    public void returnWorker (WorkerThread worker) {
+        if ( running ) {
+            synchronized (mutex) {
+                used.remove(worker);
+                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
+                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+                else {
+                    worker.setDoRun(false);
+                    synchronized (worker){worker.notify();}
+                }
+                mutex.notify();
+            }
+        }else {
+            worker.setDoRun(false);
+            synchronized (worker){worker.notify();}
+        }
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public int getMinThreads() {
+        return minThreads;
+    }
+
+    public void stop() {
+        running = false;
+        synchronized (mutex) {
+            Iterator i = idle.iterator();
+            while ( i.hasNext() ) {
+                WorkerThread worker = (WorkerThread)i.next();
+                returnWorker(worker);
+                i.remove();
+            }
+        }
+    }
+
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+
+    public void setMinThreads(int minThreads) {
+        this.minThreads = minThreads;
+    }
+
+    public ThreadCreator getThreadCreator() {
+        return this.creator;
+    }
+    
+    public static interface ThreadCreator {
+        public WorkerThread getWorkerThread();
+    }
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ThreadPool.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java Mon Oct 23 20:17:11 2006
@@ -1,89 +1,89 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-
-import org.apache.catalina.tribes.io.ListenCallback;
-
-
-
-
-/**
- * @author Filip Hanik
- * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
- */
-public abstract class WorkerThread extends Thread 
-{
-    
-    public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
-    
-    private ListenCallback callback;
-    private ThreadPool pool;
-    private boolean doRun = true;
-    private int options;
-    protected boolean useBufferPool = true;
-
-    public WorkerThread(ListenCallback callback) {
-        this.callback = callback;
-    }
-
-    public void setPool(ThreadPool pool) {
-        this.pool = pool;
-    }
-
-    public void setOptions(int options) {
-        this.options = options;
-    }
-
-    public void setCallback(ListenCallback callback) {
-        this.callback = callback;
-    }
-
-    public void setDoRun(boolean doRun) {
-        this.doRun = doRun;
-    }
-
-    public ThreadPool getPool() {
-        return pool;
-    }
-
-    public int getOptions() {
-        return options;
-    }
-
-    public ListenCallback getCallback() {
-        return callback;
-    }
-
-    public boolean isDoRun() {
-        return doRun;
-    }
-
-    public void close()
-    {
-        doRun = false;
-        notify();
-    }
-    
-    public void setUseBufferPool(boolean usebufpool) {
-        useBufferPool = usebufpool;
-    }
-    
-    public boolean getUseBufferPool() {
-        return useBufferPool;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+
+import org.apache.catalina.tribes.io.ListenCallback;
+
+
+
+
+/**
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+ */
+public abstract class WorkerThread extends Thread 
+{
+    
+    public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
+    
+    private ListenCallback callback;
+    private ThreadPool pool;
+    private boolean doRun = true;
+    private int options;
+    protected boolean useBufferPool = true;
+
+    public WorkerThread(ListenCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setPool(ThreadPool pool) {
+        this.pool = pool;
+    }
+
+    public void setOptions(int options) {
+        this.options = options;
+    }
+
+    public void setCallback(ListenCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setDoRun(boolean doRun) {
+        this.doRun = doRun;
+    }
+
+    public ThreadPool getPool() {
+        return pool;
+    }
+
+    public int getOptions() {
+        return options;
+    }
+
+    public ListenCallback getCallback() {
+        return callback;
+    }
+
+    public boolean isDoRun() {
+        return doRun;
+    }
+
+    public void close()
+    {
+        doRun = false;
+        notify();
+    }
+    
+    public void setUseBufferPool(boolean usebufpool) {
+        useBufferPool = usebufpool;
+    }
+    
+    public boolean getUseBufferPool() {
+        return useBufferPool;
+    }
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/WorkerThread.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java Mon Oct 23 20:17:11 2006
@@ -1,181 +1,181 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio;
-import java.io.IOException;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import java.net.Socket;
-import java.io.InputStream;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import java.io.OutputStream;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- * 
- * @author Filip Hanik
- * 
- * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
- */
-public class BioReplicationThread extends WorkerThread {
-
-
-    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class );
-    
-    protected Socket socket;
-    protected ObjectReader reader;
-    
-    public BioReplicationThread (ListenCallback callback) {
-        super(callback);
-    }
-
-    // loop forever waiting for work to do
-    public synchronized void run()
-    {
-        this.notify();
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled())
-                    log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if ( socket == null ) continue;
-            try {
-                drainSocket();
-            } catch ( Exception x ) {
-                log.error("Unable to service bio socket");
-            }finally {
-                try {socket.close();}catch ( Exception ignore){}
-                try {reader.close();}catch ( Exception ignore){}
-                reader = null;
-                socket = null;
-            }
-            // done, ready for more, return to pool
-            if ( getPool() != null ) getPool().returnWorker (this);
-            else setDoRun(false);
-        }
-    }
-
-    
-    public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
-        this.socket = socket;
-        this.reader = reader;
-        this.notify();		// awaken the thread
-    }
-    
-    protected void execute(ObjectReader reader) throws Exception{
-        int pkgcnt = reader.count();
-
-        if ( pkgcnt > 0 ) {
-            ChannelMessage[] msgs = reader.execute();
-            for ( int i=0; i<msgs.length; i++ ) {
-                /**
-                 * Use send ack here if you want to ack the request to the remote 
-                 * server before completing the request
-                 * This is considered an asynchronized request
-                 */
-                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
-                try {
-                    //process the message
-                    getCallback().messageDataReceived(msgs[i]);
-                    /**
-                     * Use send ack here if you want the request to complete on this
-                     * server before sending the ack to the remote server
-                     * This is considered a synchronized request
-                     */
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
-                }catch  ( Exception x ) {
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
-                    log.error("Error thrown from messageDataReceived.",x);
-                }
-                if ( getUseBufferPool() ) {
-                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                    msgs[i].setMessage(null);
-                }
-            }                       
-        }
-
-       
-    }
-
-    /**
-     * The actual code which drains the channel associated with
-     * the given key.  This method assumes the key has been
-     * modified prior to invocation to turn off selection
-     * interest in OP_READ.  When this method completes it
-     * re-enables OP_READ and calls wakeup() on the selector
-     * so the selector will resume watching this channel.
-     */
-    protected void drainSocket () throws Exception {
-        InputStream in = socket.getInputStream();
-        // loop while data available, channel is non-blocking
-        byte[] buf = new byte[1024];
-        int length = in.read(buf);
-        while ( length >= 0 ) {
-            int count = reader.append(buf,0,length,true);
-            if ( count > 0 ) execute(reader);
-            length = in.read(buf);
-        }
-    }
-
-
-
-
-    /**
-     * send a reply-acknowledgement (6,2,3)
-     * @param key
-     * @param channel
-     */
-    protected void sendAck(byte[] command) {
-        try {
-            OutputStream out = socket.getOutputStream();
-            out.write(command);
-            out.flush();
-            if (log.isTraceEnabled()) {
-                log.trace("ACK sent to " + socket.getPort());
-            }
-        } catch ( java.io.IOException x ) {
-            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
-        }
-    }
-    
-    public void close() {
-        setDoRun(false);
-        try {socket.close();}catch ( Exception ignore){}
-        try {reader.close();}catch ( Exception ignore){}
-        reader = null;
-        socket = null;
-        super.close();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import java.net.Socket;
+import java.io.InputStream;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import java.io.OutputStream;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision$, $Date$
+ */
+public class BioReplicationThread extends WorkerThread {
+
+
+    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class );
+    
+    protected Socket socket;
+    protected ObjectReader reader;
+    
+    public BioReplicationThread (ListenCallback callback) {
+        super(callback);
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run()
+    {
+        this.notify();
+        while (isDoRun()) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled())
+                    log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if ( socket == null ) continue;
+            try {
+                drainSocket();
+            } catch ( Exception x ) {
+                log.error("Unable to service bio socket");
+            }finally {
+                try {socket.close();}catch ( Exception ignore){}
+                try {reader.close();}catch ( Exception ignore){}
+                reader = null;
+                socket = null;
+            }
+            // done, ready for more, return to pool
+            if ( getPool() != null ) getPool().returnWorker (this);
+            else setDoRun(false);
+        }
+    }
+
+    
+    public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
+        this.socket = socket;
+        this.reader = reader;
+        this.notify();		// awaken the thread
+    }
+    
+    protected void execute(ObjectReader reader) throws Exception{
+        int pkgcnt = reader.count();
+
+        if ( pkgcnt > 0 ) {
+            ChannelMessage[] msgs = reader.execute();
+            for ( int i=0; i<msgs.length; i++ ) {
+                /**
+                 * Use send ack here if you want to ack the request to the remote 
+                 * server before completing the request
+                 * This is considered an asynchronized request
+                 */
+                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                try {
+                    //process the message
+                    getCallback().messageDataReceived(msgs[i]);
+                    /**
+                     * Use send ack here if you want the request to complete on this
+                     * server before sending the ack to the remote server
+                     * This is considered a synchronized request
+                     */
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                }catch  ( Exception x ) {
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+                    log.error("Error thrown from messageDataReceived.",x);
+                }
+                if ( getUseBufferPool() ) {
+                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                    msgs[i].setMessage(null);
+                }
+            }                       
+        }
+
+       
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainSocket () throws Exception {
+        InputStream in = socket.getInputStream();
+        // loop while data available, channel is non-blocking
+        byte[] buf = new byte[1024];
+        int length = in.read(buf);
+        while ( length >= 0 ) {
+            int count = reader.append(buf,0,length,true);
+            if ( count > 0 ) execute(reader);
+            length = in.read(buf);
+        }
+    }
+
+
+
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck(byte[] command) {
+        try {
+            OutputStream out = socket.getOutputStream();
+            out.write(command);
+            out.flush();
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + socket.getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+        }
+    }
+    
+    public void close() {
+        setDoRun(false);
+        try {socket.close();}catch ( Exception ignore){}
+        try {reader.close();}catch ( Exception ignore){}
+        reader = null;
+        socket = null;
+        super.close();
+    }
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java Mon Oct 23 20:17:11 2006
@@ -39,7 +39,7 @@
  * 
  * @author Peter Rossbach
  * @author Filip Hanik
- * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ * @version $Revision$ $Date$
  * @since 5.5.16
  */
 public class BioSender extends AbstractSender implements DataSender {

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioSender.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java Mon Oct 23 20:17:11 2006
@@ -1,394 +1,394 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.ErrorHandler;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-
-
-
-/**
- * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
- * length when you have strange producer thread problemes.
- * 
- * FIXME add i18n support to log messages
- * @author Rainer Jung
- * @author Peter Rossbach
- * @version $Revision: 345567 $ $Date: 2005-11-18 15:07:23 -0600 (Fri, 18 Nov 2005) $
- */
-public class FastQueue {
-
-    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(FastQueue.class);
-
-    /**
-     * This is the actual queue
-     */
-    private SingleRemoveSynchronizedAddLock lock = null;
-
-    /**
-     * First Object at queue (consumer message)
-     */
-    private LinkObject first = null;
-
-    /**
-     * Last object in queue (producer Object)
-     */
-    private LinkObject last = null;
-
-    /**
-     * Current Queue elements size
-     */
-    private int size = 0;
-
-    /**
-     * check lock to detect strange threadings things
-     */
-    private boolean checkLock = false;
-
-    /**
-     * protocol the thread wait times
-     */
-    private boolean timeWait = false;
-
-    private boolean inAdd = false;
-
-    private boolean inRemove = false;
-
-    private boolean inMutex = false;
-
-    /**
-     * limit the queue legnth ( default is unlimited)
-     */
-    private int maxQueueLength = 0;
-
-    /**
-     * addWaitTimeout for producer
-     */
-    private long addWaitTimeout = 10000L;
-
-    
-    /**
-     * removeWaitTimeout for consumer
-     */
-    private long removeWaitTimeout = 30000L;
-
-    /**
-     * enabled the queue
-     */
-    private boolean enabled = true;
-
-    /**
-     *  max queue size
-     */
-    private int maxSize = 0;
-
-    /**
-     *  avg size sample interval
-     */
-    private int sampleInterval = 100;
-
-    /**
-     * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
-     * Timeouts
-     */
-    public FastQueue() {
-        lock = new SingleRemoveSynchronizedAddLock();
-        lock.setAddWaitTimeout(addWaitTimeout);
-        lock.setRemoveWaitTimeout(removeWaitTimeout);
-    }
-
-    /**
-     * get current add wait timeout
-     * 
-     * @return current wait timeout
-     */
-    public long getAddWaitTimeout() {
-        addWaitTimeout = lock.getAddWaitTimeout();
-        return addWaitTimeout;
-    }
-
-    /**
-     * Set add wait timeout (default 10000 msec)
-     * 
-     * @param timeout
-     */
-    public void setAddWaitTimeout(long timeout) {
-        addWaitTimeout = timeout;
-        lock.setAddWaitTimeout(addWaitTimeout);
-    }
-
-    /**
-     * get current remove wait timeout
-     * 
-     * @return The timeout
-     */
-    public long getRemoveWaitTimeout() {
-        removeWaitTimeout = lock.getRemoveWaitTimeout();
-        return removeWaitTimeout;
-    }
-
-    /**
-     * set remove wait timeout ( default 30000 msec)
-     * 
-     * @param timeout
-     */
-    public void setRemoveWaitTimeout(long timeout) {
-        removeWaitTimeout = timeout;
-        lock.setRemoveWaitTimeout(removeWaitTimeout);
-    }
-
-    /**
-     * get Max Queue length
-     * 
-     * @see org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()
-     */
-    public int getMaxQueueLength() {
-        return maxQueueLength;
-    }
-
-    public void setMaxQueueLength(int length) {
-        maxQueueLength = length;
-    }
-
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(boolean enable) {
-        enabled = enable;
-        if (!enabled) {
-            lock.abortRemove();
-            last = first = null;
-        }
-    }
-
-    /**
-     * @return Returns the checkLock.
-     */
-    public boolean isCheckLock() {
-        return checkLock;
-    }
-
-    /**
-     * @param checkLock The checkLock to set.
-     */
-    public void setCheckLock(boolean checkLock) {
-        this.checkLock = checkLock;
-    }
-
-    
-    /**
-     * @return The max size
-     */
-    public int getMaxSize() {
-        return maxSize;
-    }
-
-    /**
-     * @param size
-     */
-    public void setMaxSize(int size) {
-        maxSize = size;
-    }
-
-    
-    /**
-     * unlock queue for next add 
-     */
-    public void unlockAdd() {
-        lock.unlockAdd(size > 0 ? true : false);
-    }
-
-    /**
-     * unlock queue for next remove 
-     */
-    public void unlockRemove() {
-        lock.unlockRemove();
-    }
-
-    /**
-     * start queuing
-     */
-    public void start() {
-        setEnabled(true);
-    }
-
-    /**
-     * start queuing
-     */
-    public void stop() {
-        setEnabled(false);
-    }
-
-    public int getSize() {
-        return size;
-    }
-
-    public SingleRemoveSynchronizedAddLock getLock() {
-        return lock;
-    }
-
-    /**
-     * Add new data to the queue
-     * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object)
-     * FIXME extract some method
-     */
-    public boolean add(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
-        boolean ok = true;
-        long time = 0;
-
-        if (!enabled) {
-            if (log.isInfoEnabled())
-                log.info("FastQueue.add: queue disabled, add aborted");
-            return false;
-        }
-
-        if (timeWait) {
-            time = System.currentTimeMillis();
-        }
-        lock.lockAdd();
-        try {
-            if (log.isTraceEnabled()) {
-                log.trace("FastQueue.add: starting with size " + size);
-            }
-            if (checkLock) {
-                if (inAdd)
-                    log.warn("FastQueue.add: Detected other add");
-                inAdd = true;
-                if (inMutex)
-                    log.warn("FastQueue.add: Detected other mutex in add");
-                inMutex = true;
-            }
-
-            if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
-                ok = false;
-                if (log.isTraceEnabled()) {
-                    log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
-                }
-            } else {
-                LinkObject element = new LinkObject(msg,destination, payload);
-                if (size == 0) {
-                    first = last = element;
-                    size = 1;
-                } else {
-                    if (last == null) {
-                        ok = false;
-                        log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
-                    } else {
-                        last.append(element);
-                        last = element;
-                        size++;
-                    }
-                }
-            }
-
-            if (first == null) {
-                log.error("FastQueue.add: first is null, size is " + size + " at end of add");
-            }
-            if (last == null) {
-                log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
-            }
-
-            if (checkLock) {
-                if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
-                inMutex = false;
-                if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
-                inAdd = false;
-            }
-            if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);
-
-        } finally {
-            lock.unlockAdd(true);
-        }
-        return ok;
-    }
-
-    /**
-     * remove the complete queued object list
-     * @see org.apache.catalina.tribes.util.IQueue#remove()
-     * FIXME extract some method
-     */
-    public LinkObject remove() {
-        LinkObject element;
-        boolean gotLock;
-        long time = 0;
-
-        if (!enabled) {
-            if (log.isInfoEnabled())
-                log.info("FastQueue.remove: queue disabled, remove aborted");
-            return null;
-        }
-
-        if (timeWait) {
-            time = System.currentTimeMillis();
-        }
-        gotLock = lock.lockRemove();
-        try {
-
-            if (!gotLock) {
-                if (enabled) {
-                    if (log.isInfoEnabled())
-                        log.info("FastQueue.remove: Remove aborted although queue enabled");
-                } else {
-                    if (log.isInfoEnabled())
-                        log.info("FastQueue.remove: queue disabled, remove aborted");
-                }
-                return null;
-            }
-
-            if (log.isTraceEnabled()) {
-                log.trace("FastQueue.remove: remove starting with size " + size);
-            }
-            if (checkLock) {
-                if (inRemove)
-                    log.warn("FastQueue.remove: Detected other remove");
-                inRemove = true;
-                if (inMutex)
-                    log.warn("FastQueue.remove: Detected other mutex in remove");
-                inMutex = true;
-            }
-
-            element = first;
-
-            first = last = null;
-            size = 0;
-
-            if (checkLock) {
-                if (!inMutex)
-                    log.warn("FastQueue.remove: Cancelled by other mutex in remove");
-                inMutex = false;
-                if (!inRemove)
-                    log.warn("FastQueue.remove: Cancelled by other remove");
-                inRemove = false;
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("FastQueue.remove: remove ending with size " + size);
-            }
-
-            if (timeWait) {
-                time = System.currentTimeMillis();
-            }
-        } finally {
-            lock.unlockRemove();
-        }
-        return element;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
+
+
+/**
+ * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
+ * length when you have strange producer thread problemes.
+ * 
+ * FIXME add i18n support to log messages
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @version $Revision$ $Date$
+ */
+public class FastQueue {
+
+    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(FastQueue.class);
+
+    /**
+     * This is the actual queue
+     */
+    private SingleRemoveSynchronizedAddLock lock = null;
+
+    /**
+     * First Object at queue (consumer message)
+     */
+    private LinkObject first = null;
+
+    /**
+     * Last object in queue (producer Object)
+     */
+    private LinkObject last = null;
+
+    /**
+     * Current Queue elements size
+     */
+    private int size = 0;
+
+    /**
+     * check lock to detect strange threadings things
+     */
+    private boolean checkLock = false;
+
+    /**
+     * protocol the thread wait times
+     */
+    private boolean timeWait = false;
+
+    private boolean inAdd = false;
+
+    private boolean inRemove = false;
+
+    private boolean inMutex = false;
+
+    /**
+     * limit the queue legnth ( default is unlimited)
+     */
+    private int maxQueueLength = 0;
+
+    /**
+     * addWaitTimeout for producer
+     */
+    private long addWaitTimeout = 10000L;
+
+    
+    /**
+     * removeWaitTimeout for consumer
+     */
+    private long removeWaitTimeout = 30000L;
+
+    /**
+     * enabled the queue
+     */
+    private boolean enabled = true;
+
+    /**
+     *  max queue size
+     */
+    private int maxSize = 0;
+
+    /**
+     *  avg size sample interval
+     */
+    private int sampleInterval = 100;
+
+    /**
+     * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
+     * Timeouts
+     */
+    public FastQueue() {
+        lock = new SingleRemoveSynchronizedAddLock();
+        lock.setAddWaitTimeout(addWaitTimeout);
+        lock.setRemoveWaitTimeout(removeWaitTimeout);
+    }
+
+    /**
+     * get current add wait timeout
+     * 
+     * @return current wait timeout
+     */
+    public long getAddWaitTimeout() {
+        addWaitTimeout = lock.getAddWaitTimeout();
+        return addWaitTimeout;
+    }
+
+    /**
+     * Set add wait timeout (default 10000 msec)
+     * 
+     * @param timeout
+     */
+    public void setAddWaitTimeout(long timeout) {
+        addWaitTimeout = timeout;
+        lock.setAddWaitTimeout(addWaitTimeout);
+    }
+
+    /**
+     * get current remove wait timeout
+     * 
+     * @return The timeout
+     */
+    public long getRemoveWaitTimeout() {
+        removeWaitTimeout = lock.getRemoveWaitTimeout();
+        return removeWaitTimeout;
+    }
+
+    /**
+     * set remove wait timeout ( default 30000 msec)
+     * 
+     * @param timeout
+     */
+    public void setRemoveWaitTimeout(long timeout) {
+        removeWaitTimeout = timeout;
+        lock.setRemoveWaitTimeout(removeWaitTimeout);
+    }
+
+    /**
+     * get Max Queue length
+     * 
+     * @see org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()
+     */
+    public int getMaxQueueLength() {
+        return maxQueueLength;
+    }
+
+    public void setMaxQueueLength(int length) {
+        maxQueueLength = length;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enable) {
+        enabled = enable;
+        if (!enabled) {
+            lock.abortRemove();
+            last = first = null;
+        }
+    }
+
+    /**
+     * @return Returns the checkLock.
+     */
+    public boolean isCheckLock() {
+        return checkLock;
+    }
+
+    /**
+     * @param checkLock The checkLock to set.
+     */
+    public void setCheckLock(boolean checkLock) {
+        this.checkLock = checkLock;
+    }
+
+    
+    /**
+     * @return The max size
+     */
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+    /**
+     * @param size
+     */
+    public void setMaxSize(int size) {
+        maxSize = size;
+    }
+
+    
+    /**
+     * unlock queue for next add 
+     */
+    public void unlockAdd() {
+        lock.unlockAdd(size > 0 ? true : false);
+    }
+
+    /**
+     * unlock queue for next remove 
+     */
+    public void unlockRemove() {
+        lock.unlockRemove();
+    }
+
+    /**
+     * start queuing
+     */
+    public void start() {
+        setEnabled(true);
+    }
+
+    /**
+     * start queuing
+     */
+    public void stop() {
+        setEnabled(false);
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public SingleRemoveSynchronizedAddLock getLock() {
+        return lock;
+    }
+
+    /**
+     * Add new data to the queue
+     * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object)
+     * FIXME extract some method
+     */
+    public boolean add(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+        boolean ok = true;
+        long time = 0;
+
+        if (!enabled) {
+            if (log.isInfoEnabled())
+                log.info("FastQueue.add: queue disabled, add aborted");
+            return false;
+        }
+
+        if (timeWait) {
+            time = System.currentTimeMillis();
+        }
+        lock.lockAdd();
+        try {
+            if (log.isTraceEnabled()) {
+                log.trace("FastQueue.add: starting with size " + size);
+            }
+            if (checkLock) {
+                if (inAdd)
+                    log.warn("FastQueue.add: Detected other add");
+                inAdd = true;
+                if (inMutex)
+                    log.warn("FastQueue.add: Detected other mutex in add");
+                inMutex = true;
+            }
+
+            if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
+                ok = false;
+                if (log.isTraceEnabled()) {
+                    log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
+                }
+            } else {
+                LinkObject element = new LinkObject(msg,destination, payload);
+                if (size == 0) {
+                    first = last = element;
+                    size = 1;
+                } else {
+                    if (last == null) {
+                        ok = false;
+                        log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
+                    } else {
+                        last.append(element);
+                        last = element;
+                        size++;
+                    }
+                }
+            }
+
+            if (first == null) {
+                log.error("FastQueue.add: first is null, size is " + size + " at end of add");
+            }
+            if (last == null) {
+                log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
+            }
+
+            if (checkLock) {
+                if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
+                inMutex = false;
+                if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
+                inAdd = false;
+            }
+            if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);
+
+        } finally {
+            lock.unlockAdd(true);
+        }
+        return ok;
+    }
+
+    /**
+     * remove the complete queued object list
+     * @see org.apache.catalina.tribes.util.IQueue#remove()
+     * FIXME extract some method
+     */
+    public LinkObject remove() {
+        LinkObject element;
+        boolean gotLock;
+        long time = 0;
+
+        if (!enabled) {
+            if (log.isInfoEnabled())
+                log.info("FastQueue.remove: queue disabled, remove aborted");
+            return null;
+        }
+
+        if (timeWait) {
+            time = System.currentTimeMillis();
+        }
+        gotLock = lock.lockRemove();
+        try {
+
+            if (!gotLock) {
+                if (enabled) {
+                    if (log.isInfoEnabled())
+                        log.info("FastQueue.remove: Remove aborted although queue enabled");
+                } else {
+                    if (log.isInfoEnabled())
+                        log.info("FastQueue.remove: queue disabled, remove aborted");
+                }
+                return null;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("FastQueue.remove: remove starting with size " + size);
+            }
+            if (checkLock) {
+                if (inRemove)
+                    log.warn("FastQueue.remove: Detected other remove");
+                inRemove = true;
+                if (inMutex)
+                    log.warn("FastQueue.remove: Detected other mutex in remove");
+                inMutex = true;
+            }
+
+            element = first;
+
+            first = last = null;
+            size = 0;
+
+            if (checkLock) {
+                if (!inMutex)
+                    log.warn("FastQueue.remove: Cancelled by other mutex in remove");
+                inMutex = false;
+                if (!inRemove)
+                    log.warn("FastQueue.remove: Cancelled by other remove");
+                inRemove = false;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("FastQueue.remove: remove ending with size " + size);
+            }
+
+            if (timeWait) {
+                time = System.currentTimeMillis();
+            }
+        } finally {
+            lock.unlockRemove();
+        }
+        return element;
+    }
+
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java Mon Oct 23 20:17:11 2006
@@ -1,108 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.ErrorHandler;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-
-/**
- * The class <b>LinkObject</b> implements an element
- * for a linked list, consisting of a general
- * data object and a pointer to the next element.
- *
- * @author Rainer Jung
- * @author Peter Rossbach
- * @author Filip Hanik
- * @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
-
- */
-
-public class LinkObject {
-
-    private ChannelMessage msg;
-    private LinkObject next;
-    private byte[] key ;
-    private Member[] destination;
-    private InterceptorPayload payload;
-
-    /**
-     * Construct a new element from the data object.
-     * Sets the pointer to null.
-     *
-     * @param key The key
-     * @param payload The data object.
-     */
-    public LinkObject(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
-        this.msg = msg;
-        this.next = null;
-        this.key = msg.getUniqueId();
-        this.payload = payload;
-        this.destination = destination;
-    }
-
-    /**
-     * Set the next element.
-     * @param next The next element.
-     */
-    public void append(LinkObject next) {
-        this.next = next;
-    }
-
-    /**
-     * Get the next element.
-     * @return The next element.
-     */
-    public LinkObject next() {
-        return next;
-    }
-    
-    public void setNext(LinkObject next) {
-        this.next = next;
-    }
-
-    /**
-     * Get the data object from the element.
-     * @return The data object from the element.
-     */
-    public ChannelMessage data() {
-        return msg;
-    }
-
-    /**
-     * Get the unique message id
-     * @return the unique message id
-     */
-    public byte[] getKey() {
-        return key;
-    }
-
-    public ErrorHandler getHandler() {
-        return payload!=null?payload.getErrorHandler():null;
-    }
-
-    public InterceptorPayload getPayload() {
-        return payload;
-    }
-
-    public Member[] getDestination() {
-        return destination;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
+/**
+ * The class <b>LinkObject</b> implements an element
+ * for a linked list, consisting of a general
+ * data object and a pointer to the next element.
+ *
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+
+ */
+
+public class LinkObject {
+
+    private ChannelMessage msg;
+    private LinkObject next;
+    private byte[] key ;
+    private Member[] destination;
+    private InterceptorPayload payload;
+
+    /**
+     * Construct a new element from the data object.
+     * Sets the pointer to null.
+     *
+     * @param key The key
+     * @param payload The data object.
+     */
+    public LinkObject(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+        this.msg = msg;
+        this.next = null;
+        this.key = msg.getUniqueId();
+        this.payload = payload;
+        this.destination = destination;
+    }
+
+    /**
+     * Set the next element.
+     * @param next The next element.
+     */
+    public void append(LinkObject next) {
+        this.next = next;
+    }
+
+    /**
+     * Get the next element.
+     * @return The next element.
+     */
+    public LinkObject next() {
+        return next;
+    }
+    
+    public void setNext(LinkObject next) {
+        this.next = next;
+    }
+
+    /**
+     * Get the data object from the element.
+     * @return The data object from the element.
+     */
+    public ChannelMessage data() {
+        return msg;
+    }
+
+    /**
+     * Get the unique message id
+     * @return the unique message id
+     */
+    public byte[] getKey() {
+        return key;
+    }
+
+    public ErrorHandler getHandler() {
+        return payload!=null?payload.getErrorHandler():null;
+    }
+
+    public InterceptorPayload getPayload() {
+        return payload;
+    }
+
+    public Member[] getDestination() {
+        return destination;
+    }
+
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java Mon Oct 23 20:17:11 2006
@@ -1,254 +1,254 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-/**
- * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue
- * by a single remove thread and multiple add threads.
- *
- * A thread is only allowed to be either the remove or
- * an add thread.
- *
- * The lock can either be owned by the remove thread
- * or by a single add thread.
- *
- * If the remove thread tries to get the lock,
- * but the queue is empty, it will block (poll)
- * until an add threads adds an entry to the queue and
- * releases the lock.
- * 
- * If the remove thread and add threads compete for
- * the lock and an add thread releases the lock, then
- * the remove thread will get the lock first.
- *
- * The remove thread removes all entries in the queue
- * at once and proceeses them without further
- * polling the queue.
- *
- * The lock is not reentrant, in the sense, that all
- * threads must release an owned lock before competing
- * for the lock again!
- *
- * @author Rainer Jung
- * @author Peter Rossbach
- * @version 1.1
- */
- 
-public class SingleRemoveSynchronizedAddLock {
-    
-    public SingleRemoveSynchronizedAddLock() {
-    }
-    
-    public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
-        this.dataAvailable=dataAvailable;
-    }
-    
-    /**
-     * Time in milliseconds after which threads
-     * waiting for an add lock are woken up.
-     * This is used as a safety measure in case
-     * thread notification via the unlock methods
-     * has a bug.
-     */
-    private long addWaitTimeout = 10000L;
-
-    /**
-     * Time in milliseconds after which threads
-     * waiting for a remove lock are woken up.
-     * This is used as a safety measure in case
-     * thread notification via the unlock methods
-     * has a bug.
-     */
-    private long removeWaitTimeout = 30000L;
-
-    /**
-     * The current remove thread.
-     * It is set to the remove thread polling for entries.
-     * It is reset to null when the remove thread
-     * releases the lock and proceeds processing
-     * the removed entries.
-     */
-    private Thread remover = null;
-
-    /**
-     * A flag indicating, if an add thread owns the lock.
-     */
-    private boolean addLocked = false;
-
-    /**
-     * A flag indicating, if the remove thread owns the lock.
-     */
-    private boolean removeLocked = false;
-
-    /**
-     * A flag indicating, if the remove thread is allowed
-     * to wait for the lock. The flag is set to false, when aborting.
-     */
-    private boolean removeEnabled = true;
-
-    /**
-     * A flag indicating, if the remover needs polling.
-     * It indicates, if the locked object has data available
-     * to be removed.
-     */
-    private boolean dataAvailable = false;
-
-    /**
-     * @return Value of addWaitTimeout
-     */
-    public synchronized long getAddWaitTimeout() {
-        return addWaitTimeout;
-    }
-
-    /**
-     * Set value of addWaitTimeout
-     */
-    public synchronized void setAddWaitTimeout(long timeout) {
-        addWaitTimeout = timeout;
-    }
-
-    /**
-     * @return Value of removeWaitTimeout
-     */
-    public synchronized long getRemoveWaitTimeout() {
-        return removeWaitTimeout;
-    }
-
-    /**
-     * Set value of removeWaitTimeout
-     */
-    public synchronized void setRemoveWaitTimeout(long timeout) {
-        removeWaitTimeout = timeout;
-    }
-
-    /**
-     * Check if the locked object has data available
-     * i.e. the remover can stop poling and get the lock.
-     * @return True iff the lock Object has data available.
-     */
-    public synchronized boolean isDataAvailable() {
-        return dataAvailable;
-    }
-
-    /**
-     * Check if an add thread owns the lock.
-     * @return True iff an add thread owns the lock.
-     */
-    public synchronized boolean isAddLocked() {
-        return addLocked;
-    }
-
-    /**
-     * Check if the remove thread owns the lock.
-     * @return True iff the remove thread owns the lock.
-     */
-    public synchronized boolean isRemoveLocked() {
-        return removeLocked;
-    }
-
-    /**
-     * Check if the remove thread is polling.
-     * @return True iff the remove thread is polling.
-     */
-    public synchronized boolean isRemovePolling() {
-        if ( remover != null ) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Acquires the lock by an add thread and sets the add flag.
-     * If any add thread or the remove thread already acquired the lock
-     * this add thread will block until the lock is released.
-     */
-    public synchronized void lockAdd() {
-        if ( addLocked || removeLocked ) {
-            do {
-                try {
-                    wait(addWaitTimeout);
-                } catch ( InterruptedException e ) {
-                    Thread.currentThread().interrupted();
-                }
-            } while ( addLocked || removeLocked );
-        }
-        addLocked=true;
-    }
-
-    /**
-     * Acquires the lock by the remove thread and sets the remove flag.
-     * If any add thread already acquired the lock or the queue is
-     * empty, the remove thread will block until the lock is released
-     * and the queue is not empty.
-     */
-    public synchronized boolean lockRemove() {
-        removeLocked=false;
-        removeEnabled=true;
-        if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
-            remover=Thread.currentThread();
-            do {
-                try {
-                    wait(removeWaitTimeout);
-                } catch ( InterruptedException e ) {
-                    Thread.currentThread().interrupted();
-                }
-            } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
-            remover=null;
-        }
-        if ( removeEnabled ) {
-            removeLocked=true;
-        } 
-        return removeLocked;
-    }
-
-    /**
-     * Releases the lock by an add thread and reset the remove flag.
-     * If the reader thread is polling, notify it.
-     */
-    public synchronized void unlockAdd(boolean dataAvailable) {
-        addLocked=false;
-        this.dataAvailable=dataAvailable;
-        if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) {
-            remover.interrupt();
-        } else {
-            notifyAll();
-        }
-    }
-
-    /**
-     * Releases the lock by the remove thread and reset the add flag.
-     * Notify all waiting add threads,
-     * that the lock has been released by the remove thread.
-     */
-    public synchronized void unlockRemove() {
-        removeLocked=false;
-        dataAvailable=false;
-        notifyAll();
-    }
-
-    /**
-     * Abort any polling remover thread
-     */
-    public synchronized void abortRemove() {
-        removeEnabled=false;
-        if ( remover != null ) {
-            remover.interrupt();
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+/**
+ * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue
+ * by a single remove thread and multiple add threads.
+ *
+ * A thread is only allowed to be either the remove or
+ * an add thread.
+ *
+ * The lock can either be owned by the remove thread
+ * or by a single add thread.
+ *
+ * If the remove thread tries to get the lock,
+ * but the queue is empty, it will block (poll)
+ * until an add threads adds an entry to the queue and
+ * releases the lock.
+ * 
+ * If the remove thread and add threads compete for
+ * the lock and an add thread releases the lock, then
+ * the remove thread will get the lock first.
+ *
+ * The remove thread removes all entries in the queue
+ * at once and proceeses them without further
+ * polling the queue.
+ *
+ * The lock is not reentrant, in the sense, that all
+ * threads must release an owned lock before competing
+ * for the lock again!
+ *
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @version 1.1
+ */
+ 
+public class SingleRemoveSynchronizedAddLock {
+    
+    public SingleRemoveSynchronizedAddLock() {
+    }
+    
+    public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
+        this.dataAvailable=dataAvailable;
+    }
+    
+    /**
+     * Time in milliseconds after which threads
+     * waiting for an add lock are woken up.
+     * This is used as a safety measure in case
+     * thread notification via the unlock methods
+     * has a bug.
+     */
+    private long addWaitTimeout = 10000L;
+
+    /**
+     * Time in milliseconds after which threads
+     * waiting for a remove lock are woken up.
+     * This is used as a safety measure in case
+     * thread notification via the unlock methods
+     * has a bug.
+     */
+    private long removeWaitTimeout = 30000L;
+
+    /**
+     * The current remove thread.
+     * It is set to the remove thread polling for entries.
+     * It is reset to null when the remove thread
+     * releases the lock and proceeds processing
+     * the removed entries.
+     */
+    private Thread remover = null;
+
+    /**
+     * A flag indicating, if an add thread owns the lock.
+     */
+    private boolean addLocked = false;
+
+    /**
+     * A flag indicating, if the remove thread owns the lock.
+     */
+    private boolean removeLocked = false;
+
+    /**
+     * A flag indicating, if the remove thread is allowed
+     * to wait for the lock. The flag is set to false, when aborting.
+     */
+    private boolean removeEnabled = true;
+
+    /**
+     * A flag indicating, if the remover needs polling.
+     * It indicates, if the locked object has data available
+     * to be removed.
+     */
+    private boolean dataAvailable = false;
+
+    /**
+     * @return Value of addWaitTimeout
+     */
+    public synchronized long getAddWaitTimeout() {
+        return addWaitTimeout;
+    }
+
+    /**
+     * Set value of addWaitTimeout
+     */
+    public synchronized void setAddWaitTimeout(long timeout) {
+        addWaitTimeout = timeout;
+    }
+
+    /**
+     * @return Value of removeWaitTimeout
+     */
+    public synchronized long getRemoveWaitTimeout() {
+        return removeWaitTimeout;
+    }
+
+    /**
+     * Set value of removeWaitTimeout
+     */
+    public synchronized void setRemoveWaitTimeout(long timeout) {
+        removeWaitTimeout = timeout;
+    }
+
+    /**
+     * Check if the locked object has data available
+     * i.e. the remover can stop poling and get the lock.
+     * @return True iff the lock Object has data available.
+     */
+    public synchronized boolean isDataAvailable() {
+        return dataAvailable;
+    }
+
+    /**
+     * Check if an add thread owns the lock.
+     * @return True iff an add thread owns the lock.
+     */
+    public synchronized boolean isAddLocked() {
+        return addLocked;
+    }
+
+    /**
+     * Check if the remove thread owns the lock.
+     * @return True iff the remove thread owns the lock.
+     */
+    public synchronized boolean isRemoveLocked() {
+        return removeLocked;
+    }
+
+    /**
+     * Check if the remove thread is polling.
+     * @return True iff the remove thread is polling.
+     */
+    public synchronized boolean isRemovePolling() {
+        if ( remover != null ) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Acquires the lock by an add thread and sets the add flag.
+     * If any add thread or the remove thread already acquired the lock
+     * this add thread will block until the lock is released.
+     */
+    public synchronized void lockAdd() {
+        if ( addLocked || removeLocked ) {
+            do {
+                try {
+                    wait(addWaitTimeout);
+                } catch ( InterruptedException e ) {
+                    Thread.currentThread().interrupted();
+                }
+            } while ( addLocked || removeLocked );
+        }
+        addLocked=true;
+    }
+
+    /**
+     * Acquires the lock by the remove thread and sets the remove flag.
+     * If any add thread already acquired the lock or the queue is
+     * empty, the remove thread will block until the lock is released
+     * and the queue is not empty.
+     */
+    public synchronized boolean lockRemove() {
+        removeLocked=false;
+        removeEnabled=true;
+        if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
+            remover=Thread.currentThread();
+            do {
+                try {
+                    wait(removeWaitTimeout);
+                } catch ( InterruptedException e ) {
+                    Thread.currentThread().interrupted();
+                }
+            } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
+            remover=null;
+        }
+        if ( removeEnabled ) {
+            removeLocked=true;
+        } 
+        return removeLocked;
+    }
+
+    /**
+     * Releases the lock by an add thread and reset the remove flag.
+     * If the reader thread is polling, notify it.
+     */
+    public synchronized void unlockAdd(boolean dataAvailable) {
+        addLocked=false;
+        this.dataAvailable=dataAvailable;
+        if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) {
+            remover.interrupt();
+        } else {
+            notifyAll();
+        }
+    }
+
+    /**
+     * Releases the lock by the remove thread and reset the add flag.
+     * Notify all waiting add threads,
+     * that the lock has been released by the remove thread.
+     */
+    public synchronized void unlockRemove() {
+        removeLocked=false;
+        dataAvailable=false;
+        notifyAll();
+    }
+
+    /**
+     * Abort any polling remover thread
+     */
+    public synchronized void abortRemove() {
+        removeEnabled=false;
+        if ( remover != null ) {
+            remover.interrupt();
+        }
+    }
+
+}

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/mbeans-descriptors.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/mbeans-descriptors.xml
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org