You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/02 00:52:13 UTC

svn commit: r418516 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: io/ChannelData.java io/ObjectReader.java io/XByteBuffer.java transport/nio/NioReceiver.java transport/nio/NioReplicationThread.java

Author: fhanik
Date: Sat Jul  1 15:52:11 2006
New Revision: 418516

URL: http://svn.apache.org/viewvc?rev=418516&view=rev
Log:
Major improvements, there seems to be an error with the thread handling on the NIOReceiver and the hand off for the worker thread

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul  1 15:52:11 2006
@@ -232,6 +232,7 @@
         data.setAddress(MemberImpl.getMember(addr));
         offset += addr.length; //addr data
         int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+        offset += 4; //xsize length
         System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
         xbuf.setLength(xsize);
         data.message = xbuf;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Sat Jul  1 15:52:11 2006
@@ -41,6 +41,8 @@
     private XByteBuffer buffer;
     
     protected long lastAccess = System.currentTimeMillis();
+    
+    protected boolean accessed = false;
 
     /**
      * Creates an <code>ObjectReader</code> for a TCP NIO socket channel
@@ -62,6 +64,18 @@
             log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
             this.buffer = new XByteBuffer(43800,true);
         }
+    }
+    
+    public synchronized void access() {
+        this.accessed = true;
+    }
+    
+    public synchronized void finish() {
+        this.accessed = false;
+    }
+    
+    public boolean isAccessed() {
+        return this.accessed;
     }
 
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul  1 15:52:11 2006
@@ -317,6 +317,7 @@
         if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
         int size = toInt(buf, START_DATA.length);
         XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+        xbuf.setLength(size);
         System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
         if (clearFromBuffer) {
             int totalsize = START_DATA.length + 4 + size + END_DATA.length;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul  1 15:52:11 2006
@@ -1,321 +1,377 @@
-/*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import org.apache.catalina.tribes.transport.ThreadPool;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import org.apache.catalina.tribes.util.StringManager;
-import java.util.LinkedList;
-
-/**
- * @author Filip Hanik
- * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $
- */
-public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
-
-    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
-
-    /**
-     * The string manager for this package.
-     */
-    protected StringManager sm = StringManager.getManager(Constants.Package);
-
-    /**
-     * The descriptive information about this implementation.
-     */
-    private static final String info = "NioReceiver/1.0";
-
-    private Selector selector = null;
-    private ServerSocketChannel serverChannel = null;
-
-    protected LinkedList events = new LinkedList();
-//    private Object interestOpsMutex = new Object();
-
-    public NioReceiver() {
-    }
-
-    /**
-     * Return descriptive information about this implementation and the
-     * corresponding version number, in the format
-     * <code>&lt;description&gt;/&lt;version&gt;</code>.
-     */
-    public String getInfo() {
-        return (info);
-    }
-
-//    public Object getInterestOpsMutex() {
-//        return interestOpsMutex;
-//    }
-
-    public void stop() {
-        this.stopListening();
-    }
-
-    /**
-     * start cluster receiver
-     * @throws Exception
-     * @see org.apache.catalina.tribes.ClusterReceiver#start()
-     */
-    public void start() throws IOException {
-        try {
-//            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
-            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
-        } catch (Exception x) {
-            log.fatal("ThreadPool can initilzed. Listener not started", x);
-            if ( x instanceof IOException ) throw (IOException)x;
-            else throw new IOException(x.getMessage());
-        }
-        try {
-            getBind();
-            bind();
-            Thread t = new Thread(this, "NioReceiver");
-            t.setDaemon(true);
-            t.start();
-        } catch (Exception x) {
-            log.fatal("Unable to start cluster receiver", x);
-            if ( x instanceof IOException ) throw (IOException)x;
-            else throw new IOException(x.getMessage());
-        }
-    }
-    
-    public WorkerThread getWorkerThread() {
-        NioReplicationThread thread = new NioReplicationThread(this,this);
-        thread.setUseBufferPool(this.getUseBufferPool());
-        thread.setRxBufSize(getRxBufSize());
-        thread.setOptions(getWorkerThreadOptions());
-        return thread;
-    }
-    
-    
-    
-    protected void bind() throws IOException {
-        // allocate an unbound server socket channel
-        serverChannel = ServerSocketChannel.open();
-        // Get the associated ServerSocket to bind it with
-        ServerSocket serverSocket = serverChannel.socket();
-        // create a new Selector for use below
-        selector = Selector.open();
-        // set the port the server channel will listen to
-        //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
-        bind(serverSocket,getTcpListenPort(),getAutoBind());
-        // set non-blocking mode for the listening socket
-        serverChannel.configureBlocking(false);
-        // register the ServerSocketChannel with the Selector
-        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-        
-    }
-    
-    public void addEvent(Runnable event) {
-        if ( selector != null ) {
-            synchronized (events) {
-                events.add(event);
-            }
-            selector.wakeup();
-        }
-    }
-
-    public void events() {
-        if ( events.size() == 0 ) return;
-        synchronized (events) {
-            Runnable r = null;
-            while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
-                try {
-                    r.run();
-                } catch ( Exception x ) {
-                    log.error("",x);
-                }
-            }
-            events.clear();
-        }
-    }
-
-    /**
-     * get data from channel and store in byte array
-     * send it to cluster
-     * @throws IOException
-     * @throws java.nio.channels.ClosedChannelException
-     */
-    protected void listen() throws Exception {
-        if (doListen()) {
-            log.warn("ServerSocketChannel already started");
-            return;
-        }
-        
-        setListen(true);
-
-        while (doListen() && selector != null) {
-            // this may block for a long time, upon return the
-            // selected set contains keys of the ready channels
-            try {
-                events();
-                int n = selector.select(getTcpSelectorTimeout());
-                if (n == 0) {
-                    //there is a good chance that we got here
-                    //because the TcpReplicationThread called
-                    //selector wakeup().
-                    //if that happens, we must ensure that that
-                    //thread has enough time to call interestOps
-//                    synchronized (interestOpsMutex) {
-                        //if we got the lock, means there are no
-                        //keys trying to register for the
-                        //interestOps method
-//                    }
-                    continue; // nothing to do
-                }
-                // get an iterator over the set of selected keys
-                Iterator it = selector.selectedKeys().iterator();
-                // look at each key in the selected set
-                while (it.hasNext()) {
-                    SelectionKey key = (SelectionKey) it.next();
-                    // Is a new connection coming in?
-                    if (key.isAcceptable()) {
-                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
-                        SocketChannel channel = server.accept();
-                        channel.socket().setReceiveBufferSize(getRxBufSize());
-                        channel.socket().setSendBufferSize(getTxBufSize());
-                        channel.socket().setTcpNoDelay(getTcpNoDelay());
-                        channel.socket().setKeepAlive(getSoKeepAlive());
-                        channel.socket().setOOBInline(getOoBInline());
-                        channel.socket().setReuseAddress(getSoReuseAddress());
-                        channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
-                        channel.socket().setTrafficClass(getSoTrafficClass());
-                        channel.socket().setSoTimeout(getTimeout());
-                        Object attach = new ObjectReader(channel);
-                        registerChannel(selector,
-                                        channel,
-                                        SelectionKey.OP_READ,
-                                        attach);
-                    }
-                    // is there data to read on this channel?
-                    if (key.isReadable()) {
-                        readDataFromSocket(key);
-                    } else {
-                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-                    }
-
-                    // remove key from selected set, it's been handled
-                    it.remove();
-                }
-            } catch (java.nio.channels.ClosedSelectorException cse) {
-                // ignore is normal at shutdown or stop listen socket
-            } catch (java.nio.channels.CancelledKeyException nx) {
-                log.warn("Replication client disconnected, error when polling key. Ignoring client.");
-            } catch (Throwable x) {
-                try {
-                    log.error("Unable to process request in NioReceiver", x);
-                }catch ( Throwable tx ) {
-                    tx.printStackTrace();
-                }
-            }
-
-        }
-        serverChannel.close();
-        if (selector != null)
-            selector.close();
-    }
-
-    /**
-     * Close Selector.
-     *
-     * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
-     */
-    protected void stopListening() {
-        // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
-        setListen(false);
-        if (selector != null) {
-            try {
-                for (int i = 0; i < getMaxThreads(); i++) {
-                    selector.wakeup();
-                }
-                selector.close();
-            } catch (Exception x) {
-                log.error("Unable to close cluster receiver selector.", x);
-            } finally {
-                selector = null;
-            }
-        }
-    }
-
-    // ----------------------------------------------------------
-
-    /**
-     * Register the given channel with the given selector for
-     * the given operations of interest
-     */
-    protected void registerChannel(Selector selector,
-                                   SelectableChannel channel,
-                                   int ops,
-                                   Object attach) throws Exception {
-        if (channel == null)return; // could happen
-        // set the new channel non-blocking
-        channel.configureBlocking(false);
-        // register it with the selector
-        channel.register(selector, ops, attach);
-    }
-
-    /**
-     * Start thread and listen
-     */
-    public void run() {
-        try {
-            listen();
-        } catch (Exception x) {
-            log.error("Unable to run replication listener.", x);
-        }
-    }
-
-    // ----------------------------------------------------------
-
-    /**
-     * Sample data handler method for a channel with data ready to read.
-     * @param key A SelectionKey object associated with a channel
-     *  determined by the selector to be ready for reading.  If the
-     *  channel returns an EOF condition, it is closed here, which
-     *  automatically invalidates the associated key.  The selector
-     *  will then de-register the channel on the next select call.
-     */
-    protected void readDataFromSocket(SelectionKey key) throws Exception {
-        NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
-        if (worker == null) {
-            // No threads available, do nothing, the selection
-            // loop will keep calling this method until a
-            // thread becomes available.
-            // FIXME: This design could be improved.
-            if (log.isDebugEnabled())
-                log.debug("No TcpReplicationThread available");
-        } else {
-            // invoking this wakes up the worker thread then returns
-            worker.serviceChannel(key);
-        }
-    }
-
-
-}
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ThreadPool;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.util.StringManager;
+import java.util.LinkedList;
+import java.util.Set;
+import java.nio.channels.CancelledKeyException;
+
+/**
+ * @author Filip Hanik
+ * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $
+ */
+public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
+
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
+
+    /**
+     * The string manager for this package.
+     */
+    protected StringManager sm = StringManager.getManager(Constants.Package);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "NioReceiver/1.0";
+
+    private Selector selector = null;
+    private ServerSocketChannel serverChannel = null;
+
+    protected LinkedList events = new LinkedList();
+//    private Object interestOpsMutex = new Object();
+
+    public NioReceiver() {
+    }
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+        return (info);
+    }
+
+//    public Object getInterestOpsMutex() {
+//        return interestOpsMutex;
+//    }
+
+    public void stop() {
+        this.stopListening();
+    }
+
+    /**
+     * start cluster receiver
+     * @throws Exception
+     * @see org.apache.catalina.tribes.ClusterReceiver#start()
+     */
+    public void start() throws IOException {
+        try {
+//            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
+        } catch (Exception x) {
+            log.fatal("ThreadPool can initilzed. Listener not started", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
+        }
+        try {
+            getBind();
+            bind();
+            Thread t = new Thread(this, "NioReceiver");
+            t.setDaemon(true);
+            t.start();
+        } catch (Exception x) {
+            log.fatal("Unable to start cluster receiver", x);
+            if ( x instanceof IOException ) throw (IOException)x;
+            else throw new IOException(x.getMessage());
+        }
+    }
+    
+    public WorkerThread getWorkerThread() {
+        NioReplicationThread thread = new NioReplicationThread(this,this);
+        thread.setUseBufferPool(this.getUseBufferPool());
+        thread.setRxBufSize(getRxBufSize());
+        thread.setOptions(getWorkerThreadOptions());
+        return thread;
+    }
+    
+    
+    
+    protected void bind() throws IOException {
+        // allocate an unbound server socket channel
+        serverChannel = ServerSocketChannel.open();
+        // Get the associated ServerSocket to bind it with
+        ServerSocket serverSocket = serverChannel.socket();
+        // create a new Selector for use below
+        selector = Selector.open();
+        // set the port the server channel will listen to
+        //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+        bind(serverSocket,getTcpListenPort(),getAutoBind());
+        // set non-blocking mode for the listening socket
+        serverChannel.configureBlocking(false);
+        // register the ServerSocketChannel with the Selector
+        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+        
+    }
+    
+    public void addEvent(Runnable event) {
+        if ( selector != null ) {
+            synchronized (events) {
+                events.add(event);
+            }
+            selector.wakeup();
+        }
+    }
+
+    public void events() {
+        if ( events.size() == 0 ) return;
+        synchronized (events) {
+            Runnable r = null;
+            while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
+                try {
+                    r.run();
+                } catch ( Exception x ) {
+                    log.error("",x);
+                }
+            }
+            events.clear();
+        }
+    }
+    
+    public static void cancelledKey(SelectionKey key) {
+        try {
+            ObjectReader ka = (ObjectReader)key.attachment();
+            key.cancel();
+            key.channel().close();
+            key.attach(null);
+            if ( ka != null ) ka.finish();
+        } catch (IOException e) {
+            if (log.isDebugEnabled()) log.debug("", e);
+            // Ignore
+        }
+    }
+    
+    protected void socketTimeouts() {
+        //timeout
+        Set keys = selector.keys();
+        long now = System.currentTimeMillis();
+        for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
+            SelectionKey key = (SelectionKey) iter.next();
+            try {
+//                if (key.interestOps() == SelectionKey.OP_READ) {
+//                    //only timeout sockets that we are waiting for a read from
+//                    ObjectReader ka = (ObjectReader) key.attachment();
+//                    long delta = now - ka.getLastAccess();
+//                    if (delta > (long) getTimeout()) {
+//                        cancelledKey(key);
+//                    }
+//                }
+//                else
+                if ( key.interestOps() == 0 ) {
+                    //check for keys that didn't make it in.
+                    ObjectReader ka = (ObjectReader) key.attachment();
+                    if ( ka != null ) {
+                        long delta = now - ka.getLastAccess();
+                        if (delta > (long) getTimeout() && (!ka.isAccessed())) {
+                            log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms.");
+                            ka.setLastAccess(now);
+                            key.interestOps(SelectionKey.OP_READ);
+                        }//end if
+                    } else {
+                        cancelledKey(key);
+                    }//end if
+                }//end if
+            }catch ( CancelledKeyException ckx ) {
+                cancelledKey(key);
+            }
+        }
+    }
+
+
+    /**
+     * get data from channel and store in byte array
+     * send it to cluster
+     * @throws IOException
+     * @throws java.nio.channels.ClosedChannelException
+     */
+    protected void listen() throws Exception {
+        if (doListen()) {
+            log.warn("ServerSocketChannel already started");
+            return;
+        }
+        
+        setListen(true);
+
+        while (doListen() && selector != null) {
+            // this may block for a long time, upon return the
+            // selected set contains keys of the ready channels
+            try {
+                events();
+                socketTimeouts();
+                int n = selector.select(getTcpSelectorTimeout());
+                if (n == 0) {
+                    //there is a good chance that we got here
+                    //because the TcpReplicationThread called
+                    //selector wakeup().
+                    //if that happens, we must ensure that that
+                    //thread has enough time to call interestOps
+//                    synchronized (interestOpsMutex) {
+                        //if we got the lock, means there are no
+                        //keys trying to register for the
+                        //interestOps method
+//                    }
+                    continue; // nothing to do
+                }
+                // get an iterator over the set of selected keys
+                Iterator it = selector.selectedKeys().iterator();
+                // look at each key in the selected set
+                while (it.hasNext()) {
+                    SelectionKey key = (SelectionKey) it.next();
+                    // Is a new connection coming in?
+                    if (key.isAcceptable()) {
+                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
+                        SocketChannel channel = server.accept();
+                        channel.socket().setReceiveBufferSize(getRxBufSize());
+                        channel.socket().setSendBufferSize(getTxBufSize());
+                        channel.socket().setTcpNoDelay(getTcpNoDelay());
+                        channel.socket().setKeepAlive(getSoKeepAlive());
+                        channel.socket().setOOBInline(getOoBInline());
+                        channel.socket().setReuseAddress(getSoReuseAddress());
+                        channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+                        channel.socket().setTrafficClass(getSoTrafficClass());
+                        channel.socket().setSoTimeout(getTimeout());
+                        Object attach = new ObjectReader(channel);
+                        registerChannel(selector,
+                                        channel,
+                                        SelectionKey.OP_READ,
+                                        attach);
+                    }
+                    // is there data to read on this channel?
+                    if (key.isReadable()) {
+                        readDataFromSocket(key);
+                    } else {
+                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                    }
+
+                    // remove key from selected set, it's been handled
+                    it.remove();
+                }
+            } catch (java.nio.channels.ClosedSelectorException cse) {
+                // ignore is normal at shutdown or stop listen socket
+            } catch (java.nio.channels.CancelledKeyException nx) {
+                log.warn("Replication client disconnected, error when polling key. Ignoring client.");
+            } catch (Throwable x) {
+                try {
+                    log.error("Unable to process request in NioReceiver", x);
+                }catch ( Throwable tx ) {
+                    //in case an out of memory error, will affect the logging framework as well
+                    tx.printStackTrace();
+                }
+            }
+
+        }
+        serverChannel.close();
+        if (selector != null)
+            selector.close();
+    }
+
+    
+
+    /**
+     * Close Selector.
+     *
+     * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
+     */
+    protected void stopListening() {
+        // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
+        setListen(false);
+        if (selector != null) {
+            try {
+                for (int i = 0; i < getMaxThreads(); i++) {
+                    selector.wakeup();
+                }
+                selector.close();
+            } catch (Exception x) {
+                log.error("Unable to close cluster receiver selector.", x);
+            } finally {
+                selector = null;
+            }
+        }
+    }
+
+    // ----------------------------------------------------------
+
+    /**
+     * Register the given channel with the given selector for
+     * the given operations of interest
+     */
+    protected void registerChannel(Selector selector,
+                                   SelectableChannel channel,
+                                   int ops,
+                                   Object attach) throws Exception {
+        if (channel == null)return; // could happen
+        // set the new channel non-blocking
+        channel.configureBlocking(false);
+        // register it with the selector
+        channel.register(selector, ops, attach);
+    }
+
+    /**
+     * Start thread and listen
+     */
+    public void run() {
+        try {
+            listen();
+        } catch (Exception x) {
+            log.error("Unable to run replication listener.", x);
+        }
+    }
+
+    // ----------------------------------------------------------
+
+    /**
+     * Sample data handler method for a channel with data ready to read.
+     * @param key A SelectionKey object associated with a channel
+     *  determined by the selector to be ready for reading.  If the
+     *  channel returns an EOF condition, it is closed here, which
+     *  automatically invalidates the associated key.  The selector
+     *  will then de-register the channel on the next select call.
+     */
+    protected void readDataFromSocket(SelectionKey key) throws Exception {
+        NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
+        if (worker == null) {
+            // No threads available, do nothing, the selection
+            // loop will keep calling this method until a
+            // thread becomes available.
+            // FIXME: This design could be improved.
+            if (log.isDebugEnabled())
+                log.debug("No TcpReplicationThread available");
+        } else {
+            // invoking this wakes up the worker thread then returns
+            worker.serviceChannel(key);
+        }
+    }
+
+
+}

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418516&r1=418515&r2=418516&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul  1 15:52:11 2006
@@ -1,244 +1,256 @@
-/*
- * Copyright 1999,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-import java.nio.channels.CancelledKeyException;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- * 
- * @author Filip Hanik
- * 
- * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
- */
-public class NioReplicationThread extends WorkerThread {
-    
-    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class );
-    private ByteBuffer buffer = null;
-    private SelectionKey key;
-    private int rxBufSize;
-    private NioReceiver receiver;
-    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
-    {
-        super(callback);
-        this.receiver = receiver;
-    }
-
-    // loop forever waiting for work to do
-    public synchronized void run()
-    {
-        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
-            buffer = ByteBuffer.allocateDirect(getRxBufSize());
-        }else {
-            buffer = ByteBuffer.allocate (getRxBufSize());
-        }
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled())
-                    log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if (key == null) {
-                continue;	// just in case
-            }
-            try {
-                drainChannel (key);
-            } catch (Exception e) {
-                //this is common, since the sockets on the other
-                //end expire after a certain time.
-                if ( e instanceof CancelledKeyException ) {
-                    //do nothing
-                } else if ( e instanceof IOException ) {
-                    //dont spew out stack traces for IO exceptions unless debug is enabled.
-                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
-                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
-                } else if ( log.isErrorEnabled() ) {
-                    //this is a real error, log it.
-                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
-                } 
-
-                // close channel and nudge selector
-                try {
-                    key.channel().close();
-                } catch (IOException ex) {
-                    log.error("Unable to close channel.",ex);
-                }
-                key.selector().wakeup();
-            }
-            key = null;
-            // done, ready for more, return to pool
-            getPool().returnWorker (this);
-        }
-    }
-
-    /**
-     * Called to initiate a unit of work by this worker thread
-     * on the provided SelectionKey object.  This method is
-     * synchronized, as is the run() method, so only one key
-     * can be serviced at a given time.
-     * Before waking the worker thread, and before returning
-     * to the main selection loop, this key's interest set is
-     * updated to remove OP_READ.  This will cause the selector
-     * to ignore read-readiness for this channel while the
-     * worker thread is servicing it.
-     */
-    public synchronized void serviceChannel (SelectionKey key) {
-        this.key = key;
-        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
-        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
-        this.notify();		// awaken the thread
-    }
-
-    /**
-     * The actual code which drains the channel associated with
-     * the given key.  This method assumes the key has been
-     * modified prior to invocation to turn off selection
-     * interest in OP_READ.  When this method completes it
-     * re-enables OP_READ and calls wakeup() on the selector
-     * so the selector will resume watching this channel.
-     */
-    protected void drainChannel (final SelectionKey key) throws Exception {
-        SocketChannel channel = (SocketChannel) key.channel();
-        int count;
-        buffer.clear();			// make buffer empty
-        ObjectReader reader = (ObjectReader)key.attachment();
-        reader.setLastAccess(System.currentTimeMillis());
-        // loop while data available, channel is non-blocking
-        while ((count = channel.read (buffer)) > 0) {
-            buffer.flip();		// make buffer readable
-            if ( buffer.hasArray() ) 
-                reader.append(buffer.array(),0,count,false);
-            else 
-                reader.append(buffer,count,false);
-            buffer.clear();		// make buffer empty
-        }
-        
-        int pkgcnt = reader.count();
-
-        if ( pkgcnt > 0 ) {
-            ChannelMessage[] msgs = reader.execute();
-            for ( int i=0; i<msgs.length; i++ ) {
-                /**
-                 * Use send ack here if you want to ack the request to the remote 
-                 * server before completing the request
-                 * This is considered an asynchronized request
-                 */
-                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-                try {
-                    //process the message
-                    getCallback().messageDataReceived(msgs[i]);
-                    /**
-                     * Use send ack here if you want the request to complete on this 
-                     * server before sending the ack to the remote server
-                     * This is considered a synchronized request
-                     */
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-                }catch ( Exception e ) {
-                    log.error("Processing of cluster message failed.",e);
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
-                }
-                if ( getUseBufferPool() ) {
-                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                    msgs[i].setMessage(null);
-                }
-            }                        
-        }
-
-        
-
-        
-        if (count < 0) {
-            // close channel on EOF, invalidates the key
-            if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
-            channel.close();
-            return;
-        }
-        
-        //acquire the interestOps mutex
-        Runnable r = new Runnable() {
-            public void run() {
-                try {
-                    if (key.isValid()) {
-                        // cycle the selector so this key is active again
-                        key.selector().wakeup();
-                        // resume interest in OP_READ, OP_WRITE
-                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
-                        key.interestOps(resumeOps);
-                    }
-                } catch (Exception x) {
-                    try {
-                        key.selector().close();
-                    } catch (Exception ignore) {}
-                    log.error("Unable to cycle the selector, connection disconnected?", x);
-                }
-            }
-        };
-        receiver.addEvent(r);
-        
-    }
-    
-    
-
-
-
-    /**
-     * send a reply-acknowledgement (6,2,3)
-     * @param key
-     * @param channel
-     */
-    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
-        
-        try {
-            channel.write(ByteBuffer.wrap(command));
-            if (log.isTraceEnabled()) {
-                log.trace("ACK sent to " + channel.socket().getPort());
-            }
-        } catch ( java.io.IOException x ) {
-            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
-        }
-    }
-
-    public void setRxBufSize(int rxBufSize) {
-        this.rxBufSize = rxBufSize;
-    }
-
-    public int getRxBufSize() {
-        return rxBufSize;
-    }
-}
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+import java.nio.channels.CancelledKeyException;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
+ */
+public class NioReplicationThread extends WorkerThread {
+    
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class );
+    private ByteBuffer buffer = null;
+    private SelectionKey key;
+    private int rxBufSize;
+    private NioReceiver receiver;
+    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
+    {
+        super(callback);
+        this.receiver = receiver;
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run()
+    {
+        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+            buffer = ByteBuffer.allocateDirect(getRxBufSize());
+        }else {
+            buffer = ByteBuffer.allocate (getRxBufSize());
+        }
+        while (isDoRun()) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled())
+                    log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if (key == null) {
+                continue;	// just in case
+            }
+            try {
+                drainChannel (key);
+            } catch (Exception e) {
+                //this is common, since the sockets on the other
+                //end expire after a certain time.
+                if ( e instanceof CancelledKeyException ) {
+                    //do nothing
+                } else if ( e instanceof IOException ) {
+                    //dont spew out stack traces for IO exceptions unless debug is enabled.
+                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
+                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
+                } else if ( log.isErrorEnabled() ) {
+                    //this is a real error, log it.
+                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+                } 
+                cancelKey(key);
+            }
+            key = null;
+            // done, ready for more, return to pool
+            getPool().returnWorker (this);
+        }
+    }
+
+    /**
+     * Called to initiate a unit of work by this worker thread
+     * on the provided SelectionKey object.  This method is
+     * synchronized, as is the run() method, so only one key
+     * can be serviced at a given time.
+     * Before waking the worker thread, and before returning
+     * to the main selection loop, this key's interest set is
+     * updated to remove OP_READ.  This will cause the selector
+     * to ignore read-readiness for this channel while the
+     * worker thread is servicing it.
+     */
+    public synchronized void serviceChannel (SelectionKey key) {
+        this.key = key;
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+        this.notify();		// awaken the thread
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainChannel (final SelectionKey key) throws Exception {
+        SocketChannel channel = (SocketChannel) key.channel();
+        int count;
+        buffer.clear();			// make buffer empty
+        ObjectReader reader = (ObjectReader)key.attachment();
+        reader.setLastAccess(System.currentTimeMillis());
+        try {
+            reader.access();
+
+            // loop while data available, channel is non-blocking
+            while ((count = channel.read (buffer)) > 0) {
+                buffer.flip();		// make buffer readable
+                if ( buffer.hasArray() ) 
+                    reader.append(buffer.array(),0,count,false);
+                else 
+                    reader.append(buffer,count,false);
+                buffer.clear();		// make buffer empty
+            }
+
+            int pkgcnt = reader.count();
+
+            if ( pkgcnt > 0 ) {
+                ChannelMessage[] msgs = reader.execute();
+                for ( int i=0; i<msgs.length; i++ ) {
+                    /**
+                     * Use send ack here if you want to ack the request to the remote 
+                     * server before completing the request
+                     * This is considered an asynchronized request
+                     */
+                    if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+                    try {
+                        //process the message
+                        getCallback().messageDataReceived(msgs[i]);
+                        /**
+                         * Use send ack here if you want the request to complete on this 
+                         * server before sending the ack to the remote server
+                         * This is considered a synchronized request
+                         */
+                        if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+                    }catch ( Exception e ) {
+                        log.error("Processing of cluster message failed.",e);
+                        if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+                    }
+                    if ( getUseBufferPool() ) {
+                        BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                        msgs[i].setMessage(null);
+                    }
+                }                        
+            }
+        } finally {
+            reader.finish();
+        }
+
+        
+        if (count < 0) {
+            // close channel on EOF, invalidates the key
+            if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+            cancelKey(key);
+            return;
+        }
+        
+        //register our OP_READ interest
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    if (key.isValid()) {
+                        // cycle the selector so this key is active again
+                        key.selector().wakeup();
+                        // resume interest in OP_READ, OP_WRITE
+                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+                        key.interestOps(resumeOps);
+                    }
+                } catch (CancelledKeyException ckx ) {
+                    NioReceiver.cancelledKey(key);
+                } catch (Exception x) {
+                    try {
+                        key.selector().close();
+                    } catch (Exception ignore) {}
+                    log.error("Unable to cycle the selector, connection disconnected?", x);
+                }
+            }
+        };
+        receiver.addEvent(r);
+        
+    }
+
+    private void cancelKey(final SelectionKey key) {
+        Runnable cx = new Runnable() {
+            public void run() {
+                NioReceiver.cancelledKey(key);
+            }
+        };
+        receiver.addEvent(cx);
+    }
+    
+    
+
+
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
+        
+        try {
+            ByteBuffer buf = ByteBuffer.wrap(command);
+            int total = 0;
+            while ( total < command.length ) {
+                total += channel.write(buf);
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + channel.socket().getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+        }
+    }
+
+    public void setRxBufSize(int rxBufSize) {
+        this.rxBufSize = rxBufSize;
+    }
+
+    public int getRxBufSize() {
+        return rxBufSize;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org