You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/02 00:52:13 UTC
svn commit: r418516 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
io/ChannelData.java io/ObjectReader.java io/XByteBuffer.java
transport/nio/NioReceiver.java transport/nio/NioReplicationThread.java
Author: fhanik
Date: Sat Jul 1 15:52:11 2006
New Revision: 418516
URL: http://svn.apache.org/viewvc?rev=418516&view=rev
Log:
Major improvements, there seems to be an error with the thread handling on the NIOReceiver and the hand off for the worker thread
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul 1 15:52:11 2006
@@ -232,6 +232,7 @@
data.setAddress(MemberImpl.getMember(addr));
offset += addr.length; //addr data
int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+ offset += 4; //xsize length
System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
xbuf.setLength(xsize);
data.message = xbuf;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Sat Jul 1 15:52:11 2006
@@ -41,6 +41,8 @@
private XByteBuffer buffer;
protected long lastAccess = System.currentTimeMillis();
+
+ protected boolean accessed = false;
/**
* Creates an <code>ObjectReader</code> for a TCP NIO socket channel
@@ -62,6 +64,18 @@
log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
this.buffer = new XByteBuffer(43800,true);
}
+ }
+
+ public synchronized void access() {
+ this.accessed = true;
+ }
+
+ public synchronized void finish() {
+ this.accessed = false;
+ }
+
+ public boolean isAccessed() {
+ return this.accessed;
}
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul 1 15:52:11 2006
@@ -317,6 +317,7 @@
if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
int size = toInt(buf, START_DATA.length);
XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+ xbuf.setLength(size);
System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
if (clearFromBuffer) {
int totalsize = START_DATA.length + 4 + size + END_DATA.length;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 15:52:11 2006
@@ -1,321 +1,377 @@
-/*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import org.apache.catalina.tribes.transport.ThreadPool;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import org.apache.catalina.tribes.util.StringManager;
-import java.util.LinkedList;
-
-/**
- * @author Filip Hanik
- * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $
- */
-public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
-
- protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
-
- /**
- * The string manager for this package.
- */
- protected StringManager sm = StringManager.getManager(Constants.Package);
-
- /**
- * The descriptive information about this implementation.
- */
- private static final String info = "NioReceiver/1.0";
-
- private Selector selector = null;
- private ServerSocketChannel serverChannel = null;
-
- protected LinkedList events = new LinkedList();
-// private Object interestOpsMutex = new Object();
-
- public NioReceiver() {
- }
-
- /**
- * Return descriptive information about this implementation and the
- * corresponding version number, in the format
- * <code><description>/<version></code>.
- */
- public String getInfo() {
- return (info);
- }
-
-// public Object getInterestOpsMutex() {
-// return interestOpsMutex;
-// }
-
- public void stop() {
- this.stopListening();
- }
-
- /**
- * start cluster receiver
- * @throws Exception
- * @see org.apache.catalina.tribes.ClusterReceiver#start()
- */
- public void start() throws IOException {
- try {
-// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
- setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
- } catch (Exception x) {
- log.fatal("ThreadPool can initilzed. Listener not started", x);
- if ( x instanceof IOException ) throw (IOException)x;
- else throw new IOException(x.getMessage());
- }
- try {
- getBind();
- bind();
- Thread t = new Thread(this, "NioReceiver");
- t.setDaemon(true);
- t.start();
- } catch (Exception x) {
- log.fatal("Unable to start cluster receiver", x);
- if ( x instanceof IOException ) throw (IOException)x;
- else throw new IOException(x.getMessage());
- }
- }
-
- public WorkerThread getWorkerThread() {
- NioReplicationThread thread = new NioReplicationThread(this,this);
- thread.setUseBufferPool(this.getUseBufferPool());
- thread.setRxBufSize(getRxBufSize());
- thread.setOptions(getWorkerThreadOptions());
- return thread;
- }
-
-
-
- protected void bind() throws IOException {
- // allocate an unbound server socket channel
- serverChannel = ServerSocketChannel.open();
- // Get the associated ServerSocket to bind it with
- ServerSocket serverSocket = serverChannel.socket();
- // create a new Selector for use below
- selector = Selector.open();
- // set the port the server channel will listen to
- //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
- bind(serverSocket,getTcpListenPort(),getAutoBind());
- // set non-blocking mode for the listening socket
- serverChannel.configureBlocking(false);
- // register the ServerSocketChannel with the Selector
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-
- }
-
- public void addEvent(Runnable event) {
- if ( selector != null ) {
- synchronized (events) {
- events.add(event);
- }
- selector.wakeup();
- }
- }
-
- public void events() {
- if ( events.size() == 0 ) return;
- synchronized (events) {
- Runnable r = null;
- while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
- try {
- r.run();
- } catch ( Exception x ) {
- log.error("",x);
- }
- }
- events.clear();
- }
- }
-
- /**
- * get data from channel and store in byte array
- * send it to cluster
- * @throws IOException
- * @throws java.nio.channels.ClosedChannelException
- */
- protected void listen() throws Exception {
- if (doListen()) {
- log.warn("ServerSocketChannel already started");
- return;
- }
-
- setListen(true);
-
- while (doListen() && selector != null) {
- // this may block for a long time, upon return the
- // selected set contains keys of the ready channels
- try {
- events();
- int n = selector.select(getTcpSelectorTimeout());
- if (n == 0) {
- //there is a good chance that we got here
- //because the TcpReplicationThread called
- //selector wakeup().
- //if that happens, we must ensure that that
- //thread has enough time to call interestOps
-// synchronized (interestOpsMutex) {
- //if we got the lock, means there are no
- //keys trying to register for the
- //interestOps method
-// }
- continue; // nothing to do
- }
- // get an iterator over the set of selected keys
- Iterator it = selector.selectedKeys().iterator();
- // look at each key in the selected set
- while (it.hasNext()) {
- SelectionKey key = (SelectionKey) it.next();
- // Is a new connection coming in?
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel = server.accept();
- channel.socket().setReceiveBufferSize(getRxBufSize());
- channel.socket().setSendBufferSize(getTxBufSize());
- channel.socket().setTcpNoDelay(getTcpNoDelay());
- channel.socket().setKeepAlive(getSoKeepAlive());
- channel.socket().setOOBInline(getOoBInline());
- channel.socket().setReuseAddress(getSoReuseAddress());
- channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
- channel.socket().setTrafficClass(getSoTrafficClass());
- channel.socket().setSoTimeout(getTimeout());
- Object attach = new ObjectReader(channel);
- registerChannel(selector,
- channel,
- SelectionKey.OP_READ,
- attach);
- }
- // is there data to read on this channel?
- if (key.isReadable()) {
- readDataFromSocket(key);
- } else {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- }
-
- // remove key from selected set, it's been handled
- it.remove();
- }
- } catch (java.nio.channels.ClosedSelectorException cse) {
- // ignore is normal at shutdown or stop listen socket
- } catch (java.nio.channels.CancelledKeyException nx) {
- log.warn("Replication client disconnected, error when polling key. Ignoring client.");
- } catch (Throwable x) {
- try {
- log.error("Unable to process request in NioReceiver", x);
- }catch ( Throwable tx ) {
- tx.printStackTrace();
- }
- }
-
- }
- serverChannel.close();
- if (selector != null)
- selector.close();
- }
-
- /**
- * Close Selector.
- *
- * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
- */
- protected void stopListening() {
- // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
- setListen(false);
- if (selector != null) {
- try {
- for (int i = 0; i < getMaxThreads(); i++) {
- selector.wakeup();
- }
- selector.close();
- } catch (Exception x) {
- log.error("Unable to close cluster receiver selector.", x);
- } finally {
- selector = null;
- }
- }
- }
-
- // ----------------------------------------------------------
-
- /**
- * Register the given channel with the given selector for
- * the given operations of interest
- */
- protected void registerChannel(Selector selector,
- SelectableChannel channel,
- int ops,
- Object attach) throws Exception {
- if (channel == null)return; // could happen
- // set the new channel non-blocking
- channel.configureBlocking(false);
- // register it with the selector
- channel.register(selector, ops, attach);
- }
-
- /**
- * Start thread and listen
- */
- public void run() {
- try {
- listen();
- } catch (Exception x) {
- log.error("Unable to run replication listener.", x);
- }
- }
-
- // ----------------------------------------------------------
-
- /**
- * Sample data handler method for a channel with data ready to read.
- * @param key A SelectionKey object associated with a channel
- * determined by the selector to be ready for reading. If the
- * channel returns an EOF condition, it is closed here, which
- * automatically invalidates the associated key. The selector
- * will then de-register the channel on the next select call.
- */
- protected void readDataFromSocket(SelectionKey key) throws Exception {
- NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
- if (worker == null) {
- // No threads available, do nothing, the selection
- // loop will keep calling this method until a
- // thread becomes available.
- // FIXME: This design could be improved.
- if (log.isDebugEnabled())
- log.debug("No TcpReplicationThread available");
- } else {
- // invoking this wakes up the worker thread then returns
- worker.serviceChannel(key);
- }
- }
-
-
-}
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ThreadPool;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.util.StringManager;
+import java.util.LinkedList;
+import java.util.Set;
+import java.nio.channels.CancelledKeyException;
+
+/**
+ * @author Filip Hanik
+ * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $
+ */
+public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
+
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
+
+ /**
+ * The string manager for this package.
+ */
+ protected StringManager sm = StringManager.getManager(Constants.Package);
+
+ /**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "NioReceiver/1.0";
+
+ private Selector selector = null;
+ private ServerSocketChannel serverChannel = null;
+
+ protected LinkedList events = new LinkedList();
+// private Object interestOpsMutex = new Object();
+
+ public NioReceiver() {
+ }
+
+ /**
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
+ */
+ public String getInfo() {
+ return (info);
+ }
+
+// public Object getInterestOpsMutex() {
+// return interestOpsMutex;
+// }
+
+ public void stop() {
+ this.stopListening();
+ }
+
+ /**
+ * start cluster receiver
+ * @throws Exception
+ * @see org.apache.catalina.tribes.ClusterReceiver#start()
+ */
+ public void start() throws IOException {
+ try {
+// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+ setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
+ } catch (Exception x) {
+ log.fatal("ThreadPool can initilzed. Listener not started", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
+ }
+ try {
+ getBind();
+ bind();
+ Thread t = new Thread(this, "NioReceiver");
+ t.setDaemon(true);
+ t.start();
+ } catch (Exception x) {
+ log.fatal("Unable to start cluster receiver", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
+ }
+ }
+
+ public WorkerThread getWorkerThread() {
+ NioReplicationThread thread = new NioReplicationThread(this,this);
+ thread.setUseBufferPool(this.getUseBufferPool());
+ thread.setRxBufSize(getRxBufSize());
+ thread.setOptions(getWorkerThreadOptions());
+ return thread;
+ }
+
+
+
+ protected void bind() throws IOException {
+ // allocate an unbound server socket channel
+ serverChannel = ServerSocketChannel.open();
+ // Get the associated ServerSocket to bind it with
+ ServerSocket serverSocket = serverChannel.socket();
+ // create a new Selector for use below
+ selector = Selector.open();
+ // set the port the server channel will listen to
+ //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+ bind(serverSocket,getTcpListenPort(),getAutoBind());
+ // set non-blocking mode for the listening socket
+ serverChannel.configureBlocking(false);
+ // register the ServerSocketChannel with the Selector
+ serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+ }
+
+ public void addEvent(Runnable event) {
+ if ( selector != null ) {
+ synchronized (events) {
+ events.add(event);
+ }
+ selector.wakeup();
+ }
+ }
+
+ public void events() {
+ if ( events.size() == 0 ) return;
+ synchronized (events) {
+ Runnable r = null;
+ while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
+ try {
+ r.run();
+ } catch ( Exception x ) {
+ log.error("",x);
+ }
+ }
+ events.clear();
+ }
+ }
+
+ public static void cancelledKey(SelectionKey key) {
+ try {
+ ObjectReader ka = (ObjectReader)key.attachment();
+ key.cancel();
+ key.channel().close();
+ key.attach(null);
+ if ( ka != null ) ka.finish();
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) log.debug("", e);
+ // Ignore
+ }
+ }
+
+ protected void socketTimeouts() {
+ //timeout
+ Set keys = selector.keys();
+ long now = System.currentTimeMillis();
+ for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
+ SelectionKey key = (SelectionKey) iter.next();
+ try {
+// if (key.interestOps() == SelectionKey.OP_READ) {
+// //only timeout sockets that we are waiting for a read from
+// ObjectReader ka = (ObjectReader) key.attachment();
+// long delta = now - ka.getLastAccess();
+// if (delta > (long) getTimeout()) {
+// cancelledKey(key);
+// }
+// }
+// else
+ if ( key.interestOps() == 0 ) {
+ //check for keys that didn't make it in.
+ ObjectReader ka = (ObjectReader) key.attachment();
+ if ( ka != null ) {
+ long delta = now - ka.getLastAccess();
+ if (delta > (long) getTimeout() && (!ka.isAccessed())) {
+ log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms.");
+ ka.setLastAccess(now);
+ key.interestOps(SelectionKey.OP_READ);
+ }//end if
+ } else {
+ cancelledKey(key);
+ }//end if
+ }//end if
+ }catch ( CancelledKeyException ckx ) {
+ cancelledKey(key);
+ }
+ }
+ }
+
+
+ /**
+ * get data from channel and store in byte array
+ * send it to cluster
+ * @throws IOException
+ * @throws java.nio.channels.ClosedChannelException
+ */
+ protected void listen() throws Exception {
+ if (doListen()) {
+ log.warn("ServerSocketChannel already started");
+ return;
+ }
+
+ setListen(true);
+
+ while (doListen() && selector != null) {
+ // this may block for a long time, upon return the
+ // selected set contains keys of the ready channels
+ try {
+ events();
+ socketTimeouts();
+ int n = selector.select(getTcpSelectorTimeout());
+ if (n == 0) {
+ //there is a good chance that we got here
+ //because the TcpReplicationThread called
+ //selector wakeup().
+ //if that happens, we must ensure that that
+ //thread has enough time to call interestOps
+// synchronized (interestOpsMutex) {
+ //if we got the lock, means there are no
+ //keys trying to register for the
+ //interestOps method
+// }
+ continue; // nothing to do
+ }
+ // get an iterator over the set of selected keys
+ Iterator it = selector.selectedKeys().iterator();
+ // look at each key in the selected set
+ while (it.hasNext()) {
+ SelectionKey key = (SelectionKey) it.next();
+ // Is a new connection coming in?
+ if (key.isAcceptable()) {
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ SocketChannel channel = server.accept();
+ channel.socket().setReceiveBufferSize(getRxBufSize());
+ channel.socket().setSendBufferSize(getTxBufSize());
+ channel.socket().setTcpNoDelay(getTcpNoDelay());
+ channel.socket().setKeepAlive(getSoKeepAlive());
+ channel.socket().setOOBInline(getOoBInline());
+ channel.socket().setReuseAddress(getSoReuseAddress());
+ channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+ channel.socket().setTrafficClass(getSoTrafficClass());
+ channel.socket().setSoTimeout(getTimeout());
+ Object attach = new ObjectReader(channel);
+ registerChannel(selector,
+ channel,
+ SelectionKey.OP_READ,
+ attach);
+ }
+ // is there data to read on this channel?
+ if (key.isReadable()) {
+ readDataFromSocket(key);
+ } else {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+
+ // remove key from selected set, it's been handled
+ it.remove();
+ }
+ } catch (java.nio.channels.ClosedSelectorException cse) {
+ // ignore is normal at shutdown or stop listen socket
+ } catch (java.nio.channels.CancelledKeyException nx) {
+ log.warn("Replication client disconnected, error when polling key. Ignoring client.");
+ } catch (Throwable x) {
+ try {
+ log.error("Unable to process request in NioReceiver", x);
+ }catch ( Throwable tx ) {
+ //in case an out of memory error, will affect the logging framework as well
+ tx.printStackTrace();
+ }
+ }
+
+ }
+ serverChannel.close();
+ if (selector != null)
+ selector.close();
+ }
+
+
+
+ /**
+ * Close Selector.
+ *
+ * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
+ */
+ protected void stopListening() {
+ // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
+ setListen(false);
+ if (selector != null) {
+ try {
+ for (int i = 0; i < getMaxThreads(); i++) {
+ selector.wakeup();
+ }
+ selector.close();
+ } catch (Exception x) {
+ log.error("Unable to close cluster receiver selector.", x);
+ } finally {
+ selector = null;
+ }
+ }
+ }
+
+ // ----------------------------------------------------------
+
+ /**
+ * Register the given channel with the given selector for
+ * the given operations of interest
+ */
+ protected void registerChannel(Selector selector,
+ SelectableChannel channel,
+ int ops,
+ Object attach) throws Exception {
+ if (channel == null)return; // could happen
+ // set the new channel non-blocking
+ channel.configureBlocking(false);
+ // register it with the selector
+ channel.register(selector, ops, attach);
+ }
+
+ /**
+ * Start thread and listen
+ */
+ public void run() {
+ try {
+ listen();
+ } catch (Exception x) {
+ log.error("Unable to run replication listener.", x);
+ }
+ }
+
+ // ----------------------------------------------------------
+
+ /**
+ * Sample data handler method for a channel with data ready to read.
+ * @param key A SelectionKey object associated with a channel
+ * determined by the selector to be ready for reading. If the
+ * channel returns an EOF condition, it is closed here, which
+ * automatically invalidates the associated key. The selector
+ * will then de-register the channel on the next select call.
+ */
+ protected void readDataFromSocket(SelectionKey key) throws Exception {
+ NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
+ if (worker == null) {
+ // No threads available, do nothing, the selection
+ // loop will keep calling this method until a
+ // thread becomes available.
+ // FIXME: This design could be improved.
+ if (log.isDebugEnabled())
+ log.debug("No TcpReplicationThread available");
+ } else {
+ // invoking this wakes up the worker thread then returns
+ worker.serviceChannel(key);
+ }
+ }
+
+
+}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 15:52:11 2006
@@ -1,244 +1,256 @@
-/*
- * Copyright 1999,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-import java.nio.channels.CancelledKeyException;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- *
- * @author Filip Hanik
- *
- * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
- */
-public class NioReplicationThread extends WorkerThread {
-
- private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class );
- private ByteBuffer buffer = null;
- private SelectionKey key;
- private int rxBufSize;
- private NioReceiver receiver;
- public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
- {
- super(callback);
- this.receiver = receiver;
- }
-
- // loop forever waiting for work to do
- public synchronized void run()
- {
- if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
- buffer = ByteBuffer.allocateDirect(getRxBufSize());
- }else {
- buffer = ByteBuffer.allocate (getRxBufSize());
- }
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled())
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if (key == null) {
- continue; // just in case
- }
- try {
- drainChannel (key);
- } catch (Exception e) {
- //this is common, since the sockets on the other
- //end expire after a certain time.
- if ( e instanceof CancelledKeyException ) {
- //do nothing
- } else if ( e instanceof IOException ) {
- //dont spew out stack traces for IO exceptions unless debug is enabled.
- if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
- else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
- } else if ( log.isErrorEnabled() ) {
- //this is a real error, log it.
- log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
- }
-
- // close channel and nudge selector
- try {
- key.channel().close();
- } catch (IOException ex) {
- log.error("Unable to close channel.",ex);
- }
- key.selector().wakeup();
- }
- key = null;
- // done, ready for more, return to pool
- getPool().returnWorker (this);
- }
- }
-
- /**
- * Called to initiate a unit of work by this worker thread
- * on the provided SelectionKey object. This method is
- * synchronized, as is the run() method, so only one key
- * can be serviced at a given time.
- * Before waking the worker thread, and before returning
- * to the main selection loop, this key's interest set is
- * updated to remove OP_READ. This will cause the selector
- * to ignore read-readiness for this channel while the
- * worker thread is servicing it.
- */
- public synchronized void serviceChannel (SelectionKey key) {
- this.key = key;
- key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
- key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
- this.notify(); // awaken the thread
- }
-
- /**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- */
- protected void drainChannel (final SelectionKey key) throws Exception {
- SocketChannel channel = (SocketChannel) key.channel();
- int count;
- buffer.clear(); // make buffer empty
- ObjectReader reader = (ObjectReader)key.attachment();
- reader.setLastAccess(System.currentTimeMillis());
- // loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
- if ( buffer.hasArray() )
- reader.append(buffer.array(),0,count,false);
- else
- reader.append(buffer,count,false);
- buffer.clear(); // make buffer empty
- }
-
- int pkgcnt = reader.count();
-
- if ( pkgcnt > 0 ) {
- ChannelMessage[] msgs = reader.execute();
- for ( int i=0; i<msgs.length; i++ ) {
- /**
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronized request
- */
- if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- try {
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- }catch ( Exception e ) {
- log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
- }
- if ( getUseBufferPool() ) {
- BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
- }
-
-
-
-
- if (count < 0) {
- // close channel on EOF, invalidates the key
- if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
- channel.close();
- return;
- }
-
- //acquire the interestOps mutex
- Runnable r = new Runnable() {
- public void run() {
- try {
- if (key.isValid()) {
- // cycle the selector so this key is active again
- key.selector().wakeup();
- // resume interest in OP_READ, OP_WRITE
- int resumeOps = key.interestOps() | SelectionKey.OP_READ;
- key.interestOps(resumeOps);
- }
- } catch (Exception x) {
- try {
- key.selector().close();
- } catch (Exception ignore) {}
- log.error("Unable to cycle the selector, connection disconnected?", x);
- }
- }
- };
- receiver.addEvent(r);
-
- }
-
-
-
-
-
- /**
- * send a reply-acknowledgement (6,2,3)
- * @param key
- * @param channel
- */
- protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
-
- try {
- channel.write(ByteBuffer.wrap(command));
- if (log.isTraceEnabled()) {
- log.trace("ACK sent to " + channel.socket().getPort());
- }
- } catch ( java.io.IOException x ) {
- log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
- }
- }
-
- public void setRxBufSize(int rxBufSize) {
- this.rxBufSize = rxBufSize;
- }
-
- public int getRxBufSize() {
- return rxBufSize;
- }
-}
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+import java.nio.channels.CancelledKeyException;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ *
+ * @author Filip Hanik
+ *
+ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
+ */
+public class NioReplicationThread extends WorkerThread {
+
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class );
+ private ByteBuffer buffer = null;
+ private SelectionKey key;
+ private int rxBufSize;
+ private NioReceiver receiver;
+ public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
+ {
+ super(callback);
+ this.receiver = receiver;
+ }
+
+ // loop forever waiting for work to do
+ public synchronized void run()
+ {
+ if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+ buffer = ByteBuffer.allocateDirect(getRxBufSize());
+ }else {
+ buffer = ByteBuffer.allocate (getRxBufSize());
+ }
+ while (isDoRun()) {
+ try {
+ // sleep and release object lock
+ this.wait();
+ } catch (InterruptedException e) {
+ if(log.isInfoEnabled())
+ log.info("TCP worker thread interrupted in cluster",e);
+ // clear interrupt status
+ Thread.interrupted();
+ }
+ if (key == null) {
+ continue; // just in case
+ }
+ try {
+ drainChannel (key);
+ } catch (Exception e) {
+ //this is common, since the sockets on the other
+ //end expire after a certain time.
+ if ( e instanceof CancelledKeyException ) {
+ //do nothing
+ } else if ( e instanceof IOException ) {
+ //dont spew out stack traces for IO exceptions unless debug is enabled.
+ if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
+ else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
+ } else if ( log.isErrorEnabled() ) {
+ //this is a real error, log it.
+ log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+ }
+ cancelKey(key);
+ }
+ key = null;
+ // done, ready for more, return to pool
+ getPool().returnWorker (this);
+ }
+ }
+
+ /**
+ * Called to initiate a unit of work by this worker thread
+ * on the provided SelectionKey object. This method is
+ * synchronized, as is the run() method, so only one key
+ * can be serviced at a given time.
+ * Before waking the worker thread, and before returning
+ * to the main selection loop, this key's interest set is
+ * updated to remove OP_READ. This will cause the selector
+ * to ignore read-readiness for this channel while the
+ * worker thread is servicing it.
+ */
+ public synchronized void serviceChannel (SelectionKey key) {
+ this.key = key;
+ key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
+ key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+ this.notify(); // awaken the thread
+ }
+
+ /**
+ * The actual code which drains the channel associated with
+ * the given key. This method assumes the key has been
+ * modified prior to invocation to turn off selection
+ * interest in OP_READ. When this method completes it
+ * re-enables OP_READ and calls wakeup() on the selector
+ * so the selector will resume watching this channel.
+ */
+ protected void drainChannel (final SelectionKey key) throws Exception {
+ SocketChannel channel = (SocketChannel) key.channel();
+ int count;
+ buffer.clear(); // make buffer empty
+ ObjectReader reader = (ObjectReader)key.attachment();
+ reader.setLastAccess(System.currentTimeMillis());
+ try {
+ reader.access();
+
+ // loop while data available, channel is non-blocking
+ while ((count = channel.read (buffer)) > 0) {
+ buffer.flip(); // make buffer readable
+ if ( buffer.hasArray() )
+ reader.append(buffer.array(),0,count,false);
+ else
+ reader.append(buffer,count,false);
+ buffer.clear(); // make buffer empty
+ }
+
+ int pkgcnt = reader.count();
+
+ if ( pkgcnt > 0 ) {
+ ChannelMessage[] msgs = reader.execute();
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ }catch ( Exception e ) {
+ log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ }
+ if ( getUseBufferPool() ) {
+ BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
+ }
+ } finally {
+ reader.finish();
+ }
+
+
+ if (count < 0) {
+ // close channel on EOF, invalidates the key
+ if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+ cancelKey(key);
+ return;
+ }
+
+ //register our OP_READ interest
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ if (key.isValid()) {
+ // cycle the selector so this key is active again
+ key.selector().wakeup();
+ // resume interest in OP_READ, OP_WRITE
+ int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+ key.interestOps(resumeOps);
+ }
+ } catch (CancelledKeyException ckx ) {
+ NioReceiver.cancelledKey(key);
+ } catch (Exception x) {
+ try {
+ key.selector().close();
+ } catch (Exception ignore) {}
+ log.error("Unable to cycle the selector, connection disconnected?", x);
+ }
+ }
+ };
+ receiver.addEvent(r);
+
+ }
+
+ private void cancelKey(final SelectionKey key) {
+ Runnable cx = new Runnable() {
+ public void run() {
+ NioReceiver.cancelledKey(key);
+ }
+ };
+ receiver.addEvent(cx);
+ }
+
+
+
+
+
+ /**
+ * send a reply-acknowledgement (6,2,3)
+ * @param key
+ * @param channel
+ */
+ protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
+
+ try {
+ ByteBuffer buf = ByteBuffer.wrap(command);
+ int total = 0;
+ while ( total < command.length ) {
+ total += channel.write(buf);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("ACK sent to " + channel.socket().getPort());
+ }
+ } catch ( java.io.IOException x ) {
+ log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+ }
+ }
+
+ public void setRxBufSize(int rxBufSize) {
+ this.rxBufSize = rxBufSize;
+ }
+
+ public int getRxBufSize() {
+ return rxBufSize;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org