You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC
svn commit: r467206 [15/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/
connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/
connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/
connectors/trunk/jk/native/iis/ connectors/trunk/jk/na...
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java Mon Oct 23 19:45:46 2006
@@ -1,165 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * @author not attributable
- * @version 1.0
- */
-
-public class ThreadPool
-{
- /**
- * A very simple thread pool class. The pool size is set at
- * construction time and remains fixed. Threads are cycled
- * through a FIFO idle queue.
- */
-
- List idle = new LinkedList();
- List used = new LinkedList();
-
- Object mutex = new Object();
- boolean running = true;
-
- private static int counter = 1;
- private int maxThreads;
- private int minThreads;
-
- private ThreadCreator creator = null;
-
- private static synchronized int inc() {
- return counter++;
- }
-
-
- public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
- // fill up the pool with worker threads
- this.maxThreads = maxThreads;
- this.minThreads = minThreads;
- this.creator = creator;
- //for (int i = 0; i < minThreads; i++) {
- for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
- WorkerThread thread = creator.getWorkerThread();
- setupThread(thread);
- idle.add (thread);
- }
- }
-
- protected void setupThread(WorkerThread thread) {
- synchronized (thread) {
- thread.setPool(this);
- thread.setName(thread.getClass().getName() + "[" + inc() + "]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.start();
- try {thread.wait(500); }catch ( InterruptedException x ) {}
- }
- }
-
- /**
- * Find an idle worker thread, if any. Could return null.
- */
- public WorkerThread getWorker()
- {
- WorkerThread worker = null;
- synchronized (mutex) {
- while ( worker == null && running ) {
- if (idle.size() > 0) {
- try {
- worker = (WorkerThread) idle.remove(0);
- } catch (java.util.NoSuchElementException x) {
- //this means that there are no available workers
- worker = null;
- }
- } else if ( used.size() < this.maxThreads && creator != null) {
- worker = creator.getWorkerThread();
- setupThread(worker);
- } else {
- try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
- }
- }//while
- if ( worker != null ) used.add(worker);
- }
- return (worker);
- }
-
- public int available() {
- return idle.size();
- }
-
- /**
- * Called by the worker thread to return itself to the
- * idle pool.
- */
- public void returnWorker (WorkerThread worker) {
- if ( running ) {
- synchronized (mutex) {
- used.remove(worker);
- //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
- if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
- else {
- worker.setDoRun(false);
- synchronized (worker){worker.notify();}
- }
- mutex.notify();
- }
- }else {
- worker.setDoRun(false);
- synchronized (worker){worker.notify();}
- }
- }
-
- public int getMaxThreads() {
- return maxThreads;
- }
-
- public int getMinThreads() {
- return minThreads;
- }
-
- public void stop() {
- running = false;
- synchronized (mutex) {
- Iterator i = idle.iterator();
- while ( i.hasNext() ) {
- WorkerThread worker = (WorkerThread)i.next();
- returnWorker(worker);
- i.remove();
- }
- }
- }
-
- public void setMaxThreads(int maxThreads) {
- this.maxThreads = maxThreads;
- }
-
- public void setMinThreads(int minThreads) {
- this.minThreads = minThreads;
- }
-
- public ThreadCreator getThreadCreator() {
- return this.creator;
- }
-
- public static interface ThreadCreator {
- public WorkerThread getWorkerThread();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author not attributable
+ * @version 1.0
+ */
+
+public class ThreadPool
+{
+ /**
+ * A very simple thread pool class. The pool size is set at
+ * construction time and remains fixed. Threads are cycled
+ * through a FIFO idle queue.
+ */
+
+ List idle = new LinkedList();
+ List used = new LinkedList();
+
+ Object mutex = new Object();
+ boolean running = true;
+
+ private static int counter = 1;
+ private int maxThreads;
+ private int minThreads;
+
+ private ThreadCreator creator = null;
+
+ private static synchronized int inc() {
+ return counter++;
+ }
+
+
+ public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
+ // fill up the pool with worker threads
+ this.maxThreads = maxThreads;
+ this.minThreads = minThreads;
+ this.creator = creator;
+ //for (int i = 0; i < minThreads; i++) {
+ for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
+ WorkerThread thread = creator.getWorkerThread();
+ setupThread(thread);
+ idle.add (thread);
+ }
+ }
+
+ protected void setupThread(WorkerThread thread) {
+ synchronized (thread) {
+ thread.setPool(this);
+ thread.setName(thread.getClass().getName() + "[" + inc() + "]");
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.start();
+ try {thread.wait(500); }catch ( InterruptedException x ) {}
+ }
+ }
+
+ /**
+ * Find an idle worker thread, if any. Could return null.
+ */
+ public WorkerThread getWorker()
+ {
+ WorkerThread worker = null;
+ synchronized (mutex) {
+ while ( worker == null && running ) {
+ if (idle.size() > 0) {
+ try {
+ worker = (WorkerThread) idle.remove(0);
+ } catch (java.util.NoSuchElementException x) {
+ //this means that there are no available workers
+ worker = null;
+ }
+ } else if ( used.size() < this.maxThreads && creator != null) {
+ worker = creator.getWorkerThread();
+ setupThread(worker);
+ } else {
+ try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
+ }
+ }//while
+ if ( worker != null ) used.add(worker);
+ }
+ return (worker);
+ }
+
+ public int available() {
+ return idle.size();
+ }
+
+ /**
+ * Called by the worker thread to return itself to the
+ * idle pool.
+ */
+ public void returnWorker (WorkerThread worker) {
+ if ( running ) {
+ synchronized (mutex) {
+ used.remove(worker);
+ //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
+ if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+ else {
+ worker.setDoRun(false);
+ synchronized (worker){worker.notify();}
+ }
+ mutex.notify();
+ }
+ }else {
+ worker.setDoRun(false);
+ synchronized (worker){worker.notify();}
+ }
+ }
+
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ public int getMinThreads() {
+ return minThreads;
+ }
+
+ public void stop() {
+ running = false;
+ synchronized (mutex) {
+ Iterator i = idle.iterator();
+ while ( i.hasNext() ) {
+ WorkerThread worker = (WorkerThread)i.next();
+ returnWorker(worker);
+ i.remove();
+ }
+ }
+ }
+
+ public void setMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ }
+
+ public void setMinThreads(int minThreads) {
+ this.minThreads = minThreads;
+ }
+
+ public ThreadCreator getThreadCreator() {
+ return this.creator;
+ }
+
+ public static interface ThreadCreator {
+ public WorkerThread getWorkerThread();
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java Mon Oct 23 19:45:46 2006
@@ -1,89 +1,89 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-
-import org.apache.catalina.tribes.io.ListenCallback;
-
-
-
-
-/**
- * @author Filip Hanik
- * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
- */
-public abstract class WorkerThread extends Thread
-{
-
- public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
-
- private ListenCallback callback;
- private ThreadPool pool;
- private boolean doRun = true;
- private int options;
- protected boolean useBufferPool = true;
-
- public WorkerThread(ListenCallback callback) {
- this.callback = callback;
- }
-
- public void setPool(ThreadPool pool) {
- this.pool = pool;
- }
-
- public void setOptions(int options) {
- this.options = options;
- }
-
- public void setCallback(ListenCallback callback) {
- this.callback = callback;
- }
-
- public void setDoRun(boolean doRun) {
- this.doRun = doRun;
- }
-
- public ThreadPool getPool() {
- return pool;
- }
-
- public int getOptions() {
- return options;
- }
-
- public ListenCallback getCallback() {
- return callback;
- }
-
- public boolean isDoRun() {
- return doRun;
- }
-
- public void close()
- {
- doRun = false;
- notify();
- }
-
- public void setUseBufferPool(boolean usebufpool) {
- useBufferPool = usebufpool;
- }
-
- public boolean getUseBufferPool() {
- return useBufferPool;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+
+import org.apache.catalina.tribes.io.ListenCallback;
+
+
+
+
+/**
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+ */
+public abstract class WorkerThread extends Thread
+{
+
+ public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
+
+ private ListenCallback callback;
+ private ThreadPool pool;
+ private boolean doRun = true;
+ private int options;
+ protected boolean useBufferPool = true;
+
+ public WorkerThread(ListenCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setPool(ThreadPool pool) {
+ this.pool = pool;
+ }
+
+ public void setOptions(int options) {
+ this.options = options;
+ }
+
+ public void setCallback(ListenCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setDoRun(boolean doRun) {
+ this.doRun = doRun;
+ }
+
+ public ThreadPool getPool() {
+ return pool;
+ }
+
+ public int getOptions() {
+ return options;
+ }
+
+ public ListenCallback getCallback() {
+ return callback;
+ }
+
+ public boolean isDoRun() {
+ return doRun;
+ }
+
+ public void close()
+ {
+ doRun = false;
+ notify();
+ }
+
+ public void setUseBufferPool(boolean usebufpool) {
+ useBufferPool = usebufpool;
+ }
+
+ public boolean getUseBufferPool() {
+ return useBufferPool;
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java Mon Oct 23 19:45:46 2006
@@ -1,181 +1,181 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio;
-import java.io.IOException;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import java.net.Socket;
-import java.io.InputStream;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import java.io.OutputStream;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- *
- * @author Filip Hanik
- *
- * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
- */
-public class BioReplicationThread extends WorkerThread {
-
-
- protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BioReplicationThread.class );
-
- protected Socket socket;
- protected ObjectReader reader;
-
- public BioReplicationThread (ListenCallback callback) {
- super(callback);
- }
-
- // loop forever waiting for work to do
- public synchronized void run()
- {
- this.notify();
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled())
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if ( socket == null ) continue;
- try {
- drainSocket();
- } catch ( Exception x ) {
- log.error("Unable to service bio socket");
- }finally {
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- }
- // done, ready for more, return to pool
- if ( getPool() != null ) getPool().returnWorker (this);
- else setDoRun(false);
- }
- }
-
-
- public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
- this.socket = socket;
- this.reader = reader;
- this.notify(); // awaken the thread
- }
-
- protected void execute(ObjectReader reader) throws Exception{
- int pkgcnt = reader.count();
-
- if ( pkgcnt > 0 ) {
- ChannelMessage[] msgs = reader.execute();
- for ( int i=0; i<msgs.length; i++ ) {
- /**
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronized request
- */
- if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
- try {
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
- }catch ( Exception x ) {
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
- log.error("Error thrown from messageDataReceived.",x);
- }
- if ( getUseBufferPool() ) {
- BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
- }
-
-
- }
-
- /**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- */
- protected void drainSocket () throws Exception {
- InputStream in = socket.getInputStream();
- // loop while data available, channel is non-blocking
- byte[] buf = new byte[1024];
- int length = in.read(buf);
- while ( length >= 0 ) {
- int count = reader.append(buf,0,length,true);
- if ( count > 0 ) execute(reader);
- length = in.read(buf);
- }
- }
-
-
-
-
- /**
- * send a reply-acknowledgement (6,2,3)
- * @param key
- * @param channel
- */
- protected void sendAck(byte[] command) {
- try {
- OutputStream out = socket.getOutputStream();
- out.write(command);
- out.flush();
- if (log.isTraceEnabled()) {
- log.trace("ACK sent to " + socket.getPort());
- }
- } catch ( java.io.IOException x ) {
- log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
- }
- }
-
- public void close() {
- setDoRun(false);
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- super.close();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import java.net.Socket;
+import java.io.InputStream;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import java.io.OutputStream;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ *
+ * @author Filip Hanik
+ *
+ * @version $Revision$, $Date$
+ */
+public class BioReplicationThread extends WorkerThread {
+
+
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BioReplicationThread.class );
+
+ protected Socket socket;
+ protected ObjectReader reader;
+
+ public BioReplicationThread (ListenCallback callback) {
+ super(callback);
+ }
+
+ // loop forever waiting for work to do
+ public synchronized void run()
+ {
+ this.notify();
+ while (isDoRun()) {
+ try {
+ // sleep and release object lock
+ this.wait();
+ } catch (InterruptedException e) {
+ if(log.isInfoEnabled())
+ log.info("TCP worker thread interrupted in cluster",e);
+ // clear interrupt status
+ Thread.interrupted();
+ }
+ if ( socket == null ) continue;
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
+ }
+ // done, ready for more, return to pool
+ if ( getPool() != null ) getPool().returnWorker (this);
+ else setDoRun(false);
+ }
+ }
+
+
+ public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
+ this.socket = socket;
+ this.reader = reader;
+ this.notify(); // awaken the thread
+ }
+
+ protected void execute(ObjectReader reader) throws Exception{
+ int pkgcnt = reader.count();
+
+ if ( pkgcnt > 0 ) {
+ ChannelMessage[] msgs = reader.execute();
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ }catch ( Exception x ) {
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+ log.error("Error thrown from messageDataReceived.",x);
+ }
+ if ( getUseBufferPool() ) {
+ BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * The actual code which drains the channel associated with
+ * the given key. This method assumes the key has been
+ * modified prior to invocation to turn off selection
+ * interest in OP_READ. When this method completes it
+ * re-enables OP_READ and calls wakeup() on the selector
+ * so the selector will resume watching this channel.
+ */
+ protected void drainSocket () throws Exception {
+ InputStream in = socket.getInputStream();
+ // loop while data available, channel is non-blocking
+ byte[] buf = new byte[1024];
+ int length = in.read(buf);
+ while ( length >= 0 ) {
+ int count = reader.append(buf,0,length,true);
+ if ( count > 0 ) execute(reader);
+ length = in.read(buf);
+ }
+ }
+
+
+
+
+ /**
+ * send a reply-acknowledgement (6,2,3)
+ * @param key
+ * @param channel
+ */
+ protected void sendAck(byte[] command) {
+ try {
+ OutputStream out = socket.getOutputStream();
+ out.write(command);
+ out.flush();
+ if (log.isTraceEnabled()) {
+ log.trace("ACK sent to " + socket.getPort());
+ }
+ } catch ( java.io.IOException x ) {
+ log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+ }
+ }
+
+ public void close() {
+ setDoRun(false);
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
+ super.close();
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Mon Oct 23 19:45:46 2006
@@ -39,7 +39,7 @@
*
* @author Peter Rossbach
* @author Filip Hanik
- * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ * @version $Revision$ $Date$
* @since 5.5.16
*/
public class BioSender extends AbstractSender implements DataSender {
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java Mon Oct 23 19:45:46 2006
@@ -1,394 +1,394 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.ErrorHandler;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-
-
-
-/**
- * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
- * length when you have strange producer thread problemes.
- *
- * FIXME add i18n support to log messages
- * @author Rainer Jung
- * @author Peter Rossbach
- * @version $Revision: 345567 $ $Date: 2005-11-18 15:07:23 -0600 (Fri, 18 Nov 2005) $
- */
-public class FastQueue {
-
- private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(FastQueue.class);
-
- /**
- * This is the actual queue
- */
- private SingleRemoveSynchronizedAddLock lock = null;
-
- /**
- * First Object at queue (consumer message)
- */
- private LinkObject first = null;
-
- /**
- * Last object in queue (producer Object)
- */
- private LinkObject last = null;
-
- /**
- * Current Queue elements size
- */
- private int size = 0;
-
- /**
- * check lock to detect strange threadings things
- */
- private boolean checkLock = false;
-
- /**
- * protocol the thread wait times
- */
- private boolean timeWait = false;
-
- private boolean inAdd = false;
-
- private boolean inRemove = false;
-
- private boolean inMutex = false;
-
- /**
- * limit the queue legnth ( default is unlimited)
- */
- private int maxQueueLength = 0;
-
- /**
- * addWaitTimeout for producer
- */
- private long addWaitTimeout = 10000L;
-
-
- /**
- * removeWaitTimeout for consumer
- */
- private long removeWaitTimeout = 30000L;
-
- /**
- * enabled the queue
- */
- private boolean enabled = true;
-
- /**
- * max queue size
- */
- private int maxSize = 0;
-
- /**
- * avg size sample interval
- */
- private int sampleInterval = 100;
-
- /**
- * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
- * Timeouts
- */
- public FastQueue() {
- lock = new SingleRemoveSynchronizedAddLock();
- lock.setAddWaitTimeout(addWaitTimeout);
- lock.setRemoveWaitTimeout(removeWaitTimeout);
- }
-
- /**
- * get current add wait timeout
- *
- * @return current wait timeout
- */
- public long getAddWaitTimeout() {
- addWaitTimeout = lock.getAddWaitTimeout();
- return addWaitTimeout;
- }
-
- /**
- * Set add wait timeout (default 10000 msec)
- *
- * @param timeout
- */
- public void setAddWaitTimeout(long timeout) {
- addWaitTimeout = timeout;
- lock.setAddWaitTimeout(addWaitTimeout);
- }
-
- /**
- * get current remove wait timeout
- *
- * @return The timeout
- */
- public long getRemoveWaitTimeout() {
- removeWaitTimeout = lock.getRemoveWaitTimeout();
- return removeWaitTimeout;
- }
-
- /**
- * set remove wait timeout ( default 30000 msec)
- *
- * @param timeout
- */
- public void setRemoveWaitTimeout(long timeout) {
- removeWaitTimeout = timeout;
- lock.setRemoveWaitTimeout(removeWaitTimeout);
- }
-
- /**
- * get Max Queue length
- *
- * @see org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()
- */
- public int getMaxQueueLength() {
- return maxQueueLength;
- }
-
- public void setMaxQueueLength(int length) {
- maxQueueLength = length;
- }
-
- public boolean isEnabled() {
- return enabled;
- }
-
- public void setEnabled(boolean enable) {
- enabled = enable;
- if (!enabled) {
- lock.abortRemove();
- last = first = null;
- }
- }
-
- /**
- * @return Returns the checkLock.
- */
- public boolean isCheckLock() {
- return checkLock;
- }
-
- /**
- * @param checkLock The checkLock to set.
- */
- public void setCheckLock(boolean checkLock) {
- this.checkLock = checkLock;
- }
-
-
- /**
- * @return The max size
- */
- public int getMaxSize() {
- return maxSize;
- }
-
- /**
- * @param size
- */
- public void setMaxSize(int size) {
- maxSize = size;
- }
-
-
- /**
- * unlock queue for next add
- */
- public void unlockAdd() {
- lock.unlockAdd(size > 0 ? true : false);
- }
-
- /**
- * unlock queue for next remove
- */
- public void unlockRemove() {
- lock.unlockRemove();
- }
-
- /**
- * start queuing
- */
- public void start() {
- setEnabled(true);
- }
-
- /**
- * start queuing
- */
- public void stop() {
- setEnabled(false);
- }
-
- public int getSize() {
- return size;
- }
-
- public SingleRemoveSynchronizedAddLock getLock() {
- return lock;
- }
-
- /**
- * Add new data to the queue
- * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object)
- * FIXME extract some method
- */
- public boolean add(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
- boolean ok = true;
- long time = 0;
-
- if (!enabled) {
- if (log.isInfoEnabled())
- log.info("FastQueue.add: queue disabled, add aborted");
- return false;
- }
-
- if (timeWait) {
- time = System.currentTimeMillis();
- }
- lock.lockAdd();
- try {
- if (log.isTraceEnabled()) {
- log.trace("FastQueue.add: starting with size " + size);
- }
- if (checkLock) {
- if (inAdd)
- log.warn("FastQueue.add: Detected other add");
- inAdd = true;
- if (inMutex)
- log.warn("FastQueue.add: Detected other mutex in add");
- inMutex = true;
- }
-
- if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
- ok = false;
- if (log.isTraceEnabled()) {
- log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
- }
- } else {
- LinkObject element = new LinkObject(msg,destination, payload);
- if (size == 0) {
- first = last = element;
- size = 1;
- } else {
- if (last == null) {
- ok = false;
- log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
- } else {
- last.append(element);
- last = element;
- size++;
- }
- }
- }
-
- if (first == null) {
- log.error("FastQueue.add: first is null, size is " + size + " at end of add");
- }
- if (last == null) {
- log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
- }
-
- if (checkLock) {
- if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
- inMutex = false;
- if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
- inAdd = false;
- }
- if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);
-
- } finally {
- lock.unlockAdd(true);
- }
- return ok;
- }
-
- /**
- * remove the complete queued object list
- * @see org.apache.catalina.tribes.util.IQueue#remove()
- * FIXME extract some method
- */
- public LinkObject remove() {
- LinkObject element;
- boolean gotLock;
- long time = 0;
-
- if (!enabled) {
- if (log.isInfoEnabled())
- log.info("FastQueue.remove: queue disabled, remove aborted");
- return null;
- }
-
- if (timeWait) {
- time = System.currentTimeMillis();
- }
- gotLock = lock.lockRemove();
- try {
-
- if (!gotLock) {
- if (enabled) {
- if (log.isInfoEnabled())
- log.info("FastQueue.remove: Remove aborted although queue enabled");
- } else {
- if (log.isInfoEnabled())
- log.info("FastQueue.remove: queue disabled, remove aborted");
- }
- return null;
- }
-
- if (log.isTraceEnabled()) {
- log.trace("FastQueue.remove: remove starting with size " + size);
- }
- if (checkLock) {
- if (inRemove)
- log.warn("FastQueue.remove: Detected other remove");
- inRemove = true;
- if (inMutex)
- log.warn("FastQueue.remove: Detected other mutex in remove");
- inMutex = true;
- }
-
- element = first;
-
- first = last = null;
- size = 0;
-
- if (checkLock) {
- if (!inMutex)
- log.warn("FastQueue.remove: Cancelled by other mutex in remove");
- inMutex = false;
- if (!inRemove)
- log.warn("FastQueue.remove: Cancelled by other remove");
- inRemove = false;
- }
- if (log.isTraceEnabled()) {
- log.trace("FastQueue.remove: remove ending with size " + size);
- }
-
- if (timeWait) {
- time = System.currentTimeMillis();
- }
- } finally {
- lock.unlockRemove();
- }
- return element;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
+
+
+/**
+ * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
+ * length when you have strange producer thread problemes.
+ *
+ * FIXME add i18n support to log messages
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @version $Revision$ $Date$
+ */
+public class FastQueue {
+
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(FastQueue.class);
+
+ /**
+ * This is the actual queue
+ */
+ private SingleRemoveSynchronizedAddLock lock = null;
+
+ /**
+ * First Object at queue (consumer message)
+ */
+ private LinkObject first = null;
+
+ /**
+ * Last object in queue (producer Object)
+ */
+ private LinkObject last = null;
+
+ /**
+ * Current Queue elements size
+ */
+ private int size = 0;
+
+ /**
+ * check lock to detect strange threadings things
+ */
+ private boolean checkLock = false;
+
+ /**
+ * protocol the thread wait times
+ */
+ private boolean timeWait = false;
+
+ private boolean inAdd = false;
+
+ private boolean inRemove = false;
+
+ private boolean inMutex = false;
+
+ /**
+ * limit the queue legnth ( default is unlimited)
+ */
+ private int maxQueueLength = 0;
+
+ /**
+ * addWaitTimeout for producer
+ */
+ private long addWaitTimeout = 10000L;
+
+
+ /**
+ * removeWaitTimeout for consumer
+ */
+ private long removeWaitTimeout = 30000L;
+
+ /**
+ * enabled the queue
+ */
+ private boolean enabled = true;
+
+ /**
+ * max queue size
+ */
+ private int maxSize = 0;
+
+ /**
+ * avg size sample interval
+ */
+ private int sampleInterval = 100;
+
+ /**
+ * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
+ * Timeouts
+ */
+ public FastQueue() {
+ lock = new SingleRemoveSynchronizedAddLock();
+ lock.setAddWaitTimeout(addWaitTimeout);
+ lock.setRemoveWaitTimeout(removeWaitTimeout);
+ }
+
+ /**
+ * get current add wait timeout
+ *
+ * @return current wait timeout
+ */
+ public long getAddWaitTimeout() {
+ addWaitTimeout = lock.getAddWaitTimeout();
+ return addWaitTimeout;
+ }
+
+ /**
+ * Set add wait timeout (default 10000 msec)
+ *
+ * @param timeout
+ */
+ public void setAddWaitTimeout(long timeout) {
+ addWaitTimeout = timeout;
+ lock.setAddWaitTimeout(addWaitTimeout);
+ }
+
+ /**
+ * get current remove wait timeout
+ *
+ * @return The timeout
+ */
+ public long getRemoveWaitTimeout() {
+ removeWaitTimeout = lock.getRemoveWaitTimeout();
+ return removeWaitTimeout;
+ }
+
+ /**
+ * set remove wait timeout ( default 30000 msec)
+ *
+ * @param timeout
+ */
+ public void setRemoveWaitTimeout(long timeout) {
+ removeWaitTimeout = timeout;
+ lock.setRemoveWaitTimeout(removeWaitTimeout);
+ }
+
+ /**
+ * get Max Queue length
+ *
+ * @see org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()
+ */
+ public int getMaxQueueLength() {
+ return maxQueueLength;
+ }
+
+ public void setMaxQueueLength(int length) {
+ maxQueueLength = length;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enable) {
+ enabled = enable;
+ if (!enabled) {
+ lock.abortRemove();
+ last = first = null;
+ }
+ }
+
+ /**
+ * @return Returns the checkLock.
+ */
+ public boolean isCheckLock() {
+ return checkLock;
+ }
+
+ /**
+ * @param checkLock The checkLock to set.
+ */
+ public void setCheckLock(boolean checkLock) {
+ this.checkLock = checkLock;
+ }
+
+
+ /**
+ * @return The max size
+ */
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ /**
+ * @param size
+ */
+ public void setMaxSize(int size) {
+ maxSize = size;
+ }
+
+
+ /**
+ * unlock queue for next add
+ */
+ public void unlockAdd() {
+ lock.unlockAdd(size > 0 ? true : false);
+ }
+
+ /**
+ * unlock queue for next remove
+ */
+ public void unlockRemove() {
+ lock.unlockRemove();
+ }
+
+ /**
+ * start queuing
+ */
+ public void start() {
+ setEnabled(true);
+ }
+
+ /**
+ * start queuing
+ */
+ public void stop() {
+ setEnabled(false);
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public SingleRemoveSynchronizedAddLock getLock() {
+ return lock;
+ }
+
+ /**
+ * Add new data to the queue
+ * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object)
+ * FIXME extract some method
+ */
+ public boolean add(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+ boolean ok = true;
+ long time = 0;
+
+ if (!enabled) {
+ if (log.isInfoEnabled())
+ log.info("FastQueue.add: queue disabled, add aborted");
+ return false;
+ }
+
+ if (timeWait) {
+ time = System.currentTimeMillis();
+ }
+ lock.lockAdd();
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("FastQueue.add: starting with size " + size);
+ }
+ if (checkLock) {
+ if (inAdd)
+ log.warn("FastQueue.add: Detected other add");
+ inAdd = true;
+ if (inMutex)
+ log.warn("FastQueue.add: Detected other mutex in add");
+ inMutex = true;
+ }
+
+ if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
+ ok = false;
+ if (log.isTraceEnabled()) {
+ log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
+ }
+ } else {
+ LinkObject element = new LinkObject(msg,destination, payload);
+ if (size == 0) {
+ first = last = element;
+ size = 1;
+ } else {
+ if (last == null) {
+ ok = false;
+ log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
+ } else {
+ last.append(element);
+ last = element;
+ size++;
+ }
+ }
+ }
+
+ if (first == null) {
+ log.error("FastQueue.add: first is null, size is " + size + " at end of add");
+ }
+ if (last == null) {
+ log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
+ }
+
+ if (checkLock) {
+ if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
+ inMutex = false;
+ if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
+ inAdd = false;
+ }
+ if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);
+
+ } finally {
+ lock.unlockAdd(true);
+ }
+ return ok;
+ }
+
+ /**
+ * remove the complete queued object list
+ * @see org.apache.catalina.tribes.util.IQueue#remove()
+ * FIXME extract some method
+ */
+ public LinkObject remove() {
+ LinkObject element;
+ boolean gotLock;
+ long time = 0;
+
+ if (!enabled) {
+ if (log.isInfoEnabled())
+ log.info("FastQueue.remove: queue disabled, remove aborted");
+ return null;
+ }
+
+ if (timeWait) {
+ time = System.currentTimeMillis();
+ }
+ gotLock = lock.lockRemove();
+ try {
+
+ if (!gotLock) {
+ if (enabled) {
+ if (log.isInfoEnabled())
+ log.info("FastQueue.remove: Remove aborted although queue enabled");
+ } else {
+ if (log.isInfoEnabled())
+ log.info("FastQueue.remove: queue disabled, remove aborted");
+ }
+ return null;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("FastQueue.remove: remove starting with size " + size);
+ }
+ if (checkLock) {
+ if (inRemove)
+ log.warn("FastQueue.remove: Detected other remove");
+ inRemove = true;
+ if (inMutex)
+ log.warn("FastQueue.remove: Detected other mutex in remove");
+ inMutex = true;
+ }
+
+ element = first;
+
+ first = last = null;
+ size = 0;
+
+ if (checkLock) {
+ if (!inMutex)
+ log.warn("FastQueue.remove: Cancelled by other mutex in remove");
+ inMutex = false;
+ if (!inRemove)
+ log.warn("FastQueue.remove: Cancelled by other remove");
+ inRemove = false;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("FastQueue.remove: remove ending with size " + size);
+ }
+
+ if (timeWait) {
+ time = System.currentTimeMillis();
+ }
+ } finally {
+ lock.unlockRemove();
+ }
+ return element;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/FastQueue.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java Mon Oct 23 19:45:46 2006
@@ -1,108 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.ErrorHandler;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-
-/**
- * The class <b>LinkObject</b> implements an element
- * for a linked list, consisting of a general
- * data object and a pointer to the next element.
- *
- * @author Rainer Jung
- * @author Peter Rossbach
- * @author Filip Hanik
- * @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
-
- */
-
-public class LinkObject {
-
- private ChannelMessage msg;
- private LinkObject next;
- private byte[] key ;
- private Member[] destination;
- private InterceptorPayload payload;
-
- /**
- * Construct a new element from the data object.
- * Sets the pointer to null.
- *
- * @param key The key
- * @param payload The data object.
- */
- public LinkObject(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
- this.msg = msg;
- this.next = null;
- this.key = msg.getUniqueId();
- this.payload = payload;
- this.destination = destination;
- }
-
- /**
- * Set the next element.
- * @param next The next element.
- */
- public void append(LinkObject next) {
- this.next = next;
- }
-
- /**
- * Get the next element.
- * @return The next element.
- */
- public LinkObject next() {
- return next;
- }
-
- public void setNext(LinkObject next) {
- this.next = next;
- }
-
- /**
- * Get the data object from the element.
- * @return The data object from the element.
- */
- public ChannelMessage data() {
- return msg;
- }
-
- /**
- * Get the unique message id
- * @return the unique message id
- */
- public byte[] getKey() {
- return key;
- }
-
- public ErrorHandler getHandler() {
- return payload!=null?payload.getErrorHandler():null;
- }
-
- public InterceptorPayload getPayload() {
- return payload;
- }
-
- public Member[] getDestination() {
- return destination;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
+/**
+ * The class <b>LinkObject</b> implements an element
+ * for a linked list, consisting of a general
+ * data object and a pointer to the next element.
+ *
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+
+ */
+
+public class LinkObject {
+
+ private ChannelMessage msg;
+ private LinkObject next;
+ private byte[] key ;
+ private Member[] destination;
+ private InterceptorPayload payload;
+
+ /**
+ * Construct a new element from the data object.
+ * Sets the pointer to null.
+ *
+ * @param key The key
+ * @param payload The data object.
+ */
+ public LinkObject(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+ this.msg = msg;
+ this.next = null;
+ this.key = msg.getUniqueId();
+ this.payload = payload;
+ this.destination = destination;
+ }
+
+ /**
+ * Set the next element.
+ * @param next The next element.
+ */
+ public void append(LinkObject next) {
+ this.next = next;
+ }
+
+ /**
+ * Get the next element.
+ * @return The next element.
+ */
+ public LinkObject next() {
+ return next;
+ }
+
+ public void setNext(LinkObject next) {
+ this.next = next;
+ }
+
+ /**
+ * Get the data object from the element.
+ * @return The data object from the element.
+ */
+ public ChannelMessage data() {
+ return msg;
+ }
+
+ /**
+ * Get the unique message id
+ * @return the unique message id
+ */
+ public byte[] getKey() {
+ return key;
+ }
+
+ public ErrorHandler getHandler() {
+ return payload!=null?payload.getErrorHandler():null;
+ }
+
+ public InterceptorPayload getPayload() {
+ return payload;
+ }
+
+ public Member[] getDestination() {
+ return destination;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java Mon Oct 23 19:45:46 2006
@@ -1,254 +1,254 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio.util;
-
-/**
- * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue
- * by a single remove thread and multiple add threads.
- *
- * A thread is only allowed to be either the remove or
- * an add thread.
- *
- * The lock can either be owned by the remove thread
- * or by a single add thread.
- *
- * If the remove thread tries to get the lock,
- * but the queue is empty, it will block (poll)
- * until an add threads adds an entry to the queue and
- * releases the lock.
- *
- * If the remove thread and add threads compete for
- * the lock and an add thread releases the lock, then
- * the remove thread will get the lock first.
- *
- * The remove thread removes all entries in the queue
- * at once and proceeses them without further
- * polling the queue.
- *
- * The lock is not reentrant, in the sense, that all
- * threads must release an owned lock before competing
- * for the lock again!
- *
- * @author Rainer Jung
- * @author Peter Rossbach
- * @version 1.1
- */
-
-public class SingleRemoveSynchronizedAddLock {
-
- public SingleRemoveSynchronizedAddLock() {
- }
-
- public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
- this.dataAvailable=dataAvailable;
- }
-
- /**
- * Time in milliseconds after which threads
- * waiting for an add lock are woken up.
- * This is used as a safety measure in case
- * thread notification via the unlock methods
- * has a bug.
- */
- private long addWaitTimeout = 10000L;
-
- /**
- * Time in milliseconds after which threads
- * waiting for a remove lock are woken up.
- * This is used as a safety measure in case
- * thread notification via the unlock methods
- * has a bug.
- */
- private long removeWaitTimeout = 30000L;
-
- /**
- * The current remove thread.
- * It is set to the remove thread polling for entries.
- * It is reset to null when the remove thread
- * releases the lock and proceeds processing
- * the removed entries.
- */
- private Thread remover = null;
-
- /**
- * A flag indicating, if an add thread owns the lock.
- */
- private boolean addLocked = false;
-
- /**
- * A flag indicating, if the remove thread owns the lock.
- */
- private boolean removeLocked = false;
-
- /**
- * A flag indicating, if the remove thread is allowed
- * to wait for the lock. The flag is set to false, when aborting.
- */
- private boolean removeEnabled = true;
-
- /**
- * A flag indicating, if the remover needs polling.
- * It indicates, if the locked object has data available
- * to be removed.
- */
- private boolean dataAvailable = false;
-
- /**
- * @return Value of addWaitTimeout
- */
- public synchronized long getAddWaitTimeout() {
- return addWaitTimeout;
- }
-
- /**
- * Set value of addWaitTimeout
- */
- public synchronized void setAddWaitTimeout(long timeout) {
- addWaitTimeout = timeout;
- }
-
- /**
- * @return Value of removeWaitTimeout
- */
- public synchronized long getRemoveWaitTimeout() {
- return removeWaitTimeout;
- }
-
- /**
- * Set value of removeWaitTimeout
- */
- public synchronized void setRemoveWaitTimeout(long timeout) {
- removeWaitTimeout = timeout;
- }
-
- /**
- * Check if the locked object has data available
- * i.e. the remover can stop poling and get the lock.
- * @return True iff the lock Object has data available.
- */
- public synchronized boolean isDataAvailable() {
- return dataAvailable;
- }
-
- /**
- * Check if an add thread owns the lock.
- * @return True iff an add thread owns the lock.
- */
- public synchronized boolean isAddLocked() {
- return addLocked;
- }
-
- /**
- * Check if the remove thread owns the lock.
- * @return True iff the remove thread owns the lock.
- */
- public synchronized boolean isRemoveLocked() {
- return removeLocked;
- }
-
- /**
- * Check if the remove thread is polling.
- * @return True iff the remove thread is polling.
- */
- public synchronized boolean isRemovePolling() {
- if ( remover != null ) {
- return true;
- }
- return false;
- }
-
- /**
- * Acquires the lock by an add thread and sets the add flag.
- * If any add thread or the remove thread already acquired the lock
- * this add thread will block until the lock is released.
- */
- public synchronized void lockAdd() {
- if ( addLocked || removeLocked ) {
- do {
- try {
- wait(addWaitTimeout);
- } catch ( InterruptedException e ) {
- Thread.currentThread().interrupted();
- }
- } while ( addLocked || removeLocked );
- }
- addLocked=true;
- }
-
- /**
- * Acquires the lock by the remove thread and sets the remove flag.
- * If any add thread already acquired the lock or the queue is
- * empty, the remove thread will block until the lock is released
- * and the queue is not empty.
- */
- public synchronized boolean lockRemove() {
- removeLocked=false;
- removeEnabled=true;
- if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
- remover=Thread.currentThread();
- do {
- try {
- wait(removeWaitTimeout);
- } catch ( InterruptedException e ) {
- Thread.currentThread().interrupted();
- }
- } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
- remover=null;
- }
- if ( removeEnabled ) {
- removeLocked=true;
- }
- return removeLocked;
- }
-
- /**
- * Releases the lock by an add thread and reset the remove flag.
- * If the reader thread is polling, notify it.
- */
- public synchronized void unlockAdd(boolean dataAvailable) {
- addLocked=false;
- this.dataAvailable=dataAvailable;
- if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) {
- remover.interrupt();
- } else {
- notifyAll();
- }
- }
-
- /**
- * Releases the lock by the remove thread and reset the add flag.
- * Notify all waiting add threads,
- * that the lock has been released by the remove thread.
- */
- public synchronized void unlockRemove() {
- removeLocked=false;
- dataAvailable=false;
- notifyAll();
- }
-
- /**
- * Abort any polling remover thread
- */
- public synchronized void abortRemove() {
- removeEnabled=false;
- if ( remover != null ) {
- remover.interrupt();
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio.util;
+
+/**
+ * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue
+ * by a single remove thread and multiple add threads.
+ *
+ * A thread is only allowed to be either the remove or
+ * an add thread.
+ *
+ * The lock can either be owned by the remove thread
+ * or by a single add thread.
+ *
+ * If the remove thread tries to get the lock,
+ * but the queue is empty, it will block (poll)
+ * until an add threads adds an entry to the queue and
+ * releases the lock.
+ *
+ * If the remove thread and add threads compete for
+ * the lock and an add thread releases the lock, then
+ * the remove thread will get the lock first.
+ *
+ * The remove thread removes all entries in the queue
+ * at once and proceeses them without further
+ * polling the queue.
+ *
+ * The lock is not reentrant, in the sense, that all
+ * threads must release an owned lock before competing
+ * for the lock again!
+ *
+ * @author Rainer Jung
+ * @author Peter Rossbach
+ * @version 1.1
+ */
+
+public class SingleRemoveSynchronizedAddLock {
+
+ public SingleRemoveSynchronizedAddLock() {
+ }
+
+ public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
+ this.dataAvailable=dataAvailable;
+ }
+
+ /**
+ * Time in milliseconds after which threads
+ * waiting for an add lock are woken up.
+ * This is used as a safety measure in case
+ * thread notification via the unlock methods
+ * has a bug.
+ */
+ private long addWaitTimeout = 10000L;
+
+ /**
+ * Time in milliseconds after which threads
+ * waiting for a remove lock are woken up.
+ * This is used as a safety measure in case
+ * thread notification via the unlock methods
+ * has a bug.
+ */
+ private long removeWaitTimeout = 30000L;
+
+ /**
+ * The current remove thread.
+ * It is set to the remove thread polling for entries.
+ * It is reset to null when the remove thread
+ * releases the lock and proceeds processing
+ * the removed entries.
+ */
+ private Thread remover = null;
+
+ /**
+ * A flag indicating, if an add thread owns the lock.
+ */
+ private boolean addLocked = false;
+
+ /**
+ * A flag indicating, if the remove thread owns the lock.
+ */
+ private boolean removeLocked = false;
+
+ /**
+ * A flag indicating, if the remove thread is allowed
+ * to wait for the lock. The flag is set to false, when aborting.
+ */
+ private boolean removeEnabled = true;
+
+ /**
+ * A flag indicating, if the remover needs polling.
+ * It indicates, if the locked object has data available
+ * to be removed.
+ */
+ private boolean dataAvailable = false;
+
+ /**
+ * @return Value of addWaitTimeout
+ */
+ public synchronized long getAddWaitTimeout() {
+ return addWaitTimeout;
+ }
+
+ /**
+ * Set value of addWaitTimeout
+ */
+ public synchronized void setAddWaitTimeout(long timeout) {
+ addWaitTimeout = timeout;
+ }
+
+ /**
+ * @return Value of removeWaitTimeout
+ */
+ public synchronized long getRemoveWaitTimeout() {
+ return removeWaitTimeout;
+ }
+
+ /**
+ * Set value of removeWaitTimeout
+ */
+ public synchronized void setRemoveWaitTimeout(long timeout) {
+ removeWaitTimeout = timeout;
+ }
+
+ /**
+ * Check if the locked object has data available
+ * i.e. the remover can stop poling and get the lock.
+ * @return True iff the lock Object has data available.
+ */
+ public synchronized boolean isDataAvailable() {
+ return dataAvailable;
+ }
+
+ /**
+ * Check if an add thread owns the lock.
+ * @return True iff an add thread owns the lock.
+ */
+ public synchronized boolean isAddLocked() {
+ return addLocked;
+ }
+
+ /**
+ * Check if the remove thread owns the lock.
+ * @return True iff the remove thread owns the lock.
+ */
+ public synchronized boolean isRemoveLocked() {
+ return removeLocked;
+ }
+
+ /**
+ * Check if the remove thread is polling.
+ * @return True iff the remove thread is polling.
+ */
+ public synchronized boolean isRemovePolling() {
+ if ( remover != null ) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Acquires the lock by an add thread and sets the add flag.
+ * If any add thread or the remove thread already acquired the lock
+ * this add thread will block until the lock is released.
+ */
+ public synchronized void lockAdd() {
+ if ( addLocked || removeLocked ) {
+ do {
+ try {
+ wait(addWaitTimeout);
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupted();
+ }
+ } while ( addLocked || removeLocked );
+ }
+ addLocked=true;
+ }
+
+ /**
+ * Acquires the lock by the remove thread and sets the remove flag.
+ * If any add thread already acquired the lock or the queue is
+ * empty, the remove thread will block until the lock is released
+ * and the queue is not empty.
+ */
+ public synchronized boolean lockRemove() {
+ removeLocked=false;
+ removeEnabled=true;
+ if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
+ remover=Thread.currentThread();
+ do {
+ try {
+ wait(removeWaitTimeout);
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupted();
+ }
+ } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
+ remover=null;
+ }
+ if ( removeEnabled ) {
+ removeLocked=true;
+ }
+ return removeLocked;
+ }
+
+ /**
+ * Releases the lock by an add thread and reset the remove flag.
+ * If the reader thread is polling, notify it.
+ */
+ public synchronized void unlockAdd(boolean dataAvailable) {
+ addLocked=false;
+ this.dataAvailable=dataAvailable;
+ if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) {
+ remover.interrupt();
+ } else {
+ notifyAll();
+ }
+ }
+
+ /**
+ * Releases the lock by the remove thread and reset the add flag.
+ * Notify all waiting add threads,
+ * that the lock has been released by the remove thread.
+ */
+ public synchronized void unlockRemove() {
+ removeLocked=false;
+ dataAvailable=false;
+ notifyAll();
+ }
+
+ /**
+ * Abort any polling remover thread
+ */
+ public synchronized void abortRemove() {
+ removeEnabled=false;
+ if ( remover != null ) {
+ remover.interrupt();
+ }
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/SingleRemoveSynchronizedAddLock.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/mbeans-descriptors.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/mbeans-descriptors.xml
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org