You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/26 15:16:02 UTC

svn commit: r598285 [2/2] - in /incubator/qpid/branches/M2.1.1/java: broker/ broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/transport/ client/sr...

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,1034 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.Queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$,
+ */
+class MultiThreadSocketIoProcessor extends SocketIoProcessor
+{
+    Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
+    Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
+    Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
+
+    private static final long SELECTOR_TIMEOUT = 1000L;
+
+    private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
+    private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
+
+    private final Object readLock = new Object();
+    private final Object writeLock = new Object();
+
+    private final String threadName;
+    private final Executor executor;
+
+    private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
+
+    /**
+     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+     */
+    private volatile Selector selector, writeSelector;
+
+    private final Queue newSessions = new Queue();
+    private final Queue removingSessions = new Queue();
+    private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
+    private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
+
+    private final Queue trafficControllingSessions = new Queue();
+
+    private ReadWorker readWorker;
+    private WriteWorker writeWorker;
+    private long lastIdleReadCheckTime = System.currentTimeMillis();
+    private long lastIdleWriteCheckTime = System.currentTimeMillis();
+
+    MultiThreadSocketIoProcessor(String threadName, Executor executor)
+    {
+        super(threadName, executor);
+        this.threadName = threadName;
+        this.executor = executor;
+    }
+
+    void addNew( SocketSessionImpl session ) throws IOException
+    {
+        synchronized( newSessions )
+        {
+            newSessions.push( session );
+        }
+
+        startupWorker();
+
+        selector.wakeup();
+        writeSelector.wakeup();
+    }
+
+    void remove( SocketSessionImpl session ) throws IOException
+    {
+        scheduleRemove( session );
+        startupWorker();
+        selector.wakeup();
+    }
+
+    private void startupWorker() throws IOException
+    {
+        synchronized(readLock)
+        {
+            if (readWorker == null)
+            {
+                selector = Selector.open();
+                readWorker = new ReadWorker();
+                executor.execute(new NamePreservingRunnable(readWorker));
+            }
+        }
+
+        synchronized(writeLock)
+        {
+            if (writeWorker == null)
+            {
+                writeSelector = Selector.open();
+                writeWorker = new WriteWorker();
+                executor.execute(new NamePreservingRunnable(writeWorker));
+            }
+        }
+
+    }
+
+    void flush( SocketSessionImpl session )
+    {
+        scheduleFlush( session );
+        Selector selector = this.writeSelector;
+
+        if( selector != null )
+        {
+            selector.wakeup();
+        }
+    }
+
+    void updateTrafficMask( SocketSessionImpl session )
+    {
+        scheduleTrafficControl( session );
+        Selector selector = this.selector;
+        if( selector != null )
+        {
+            selector.wakeup();
+        }
+    }
+
+    private void scheduleRemove( SocketSessionImpl session )
+    {
+        synchronized( removingSessions )
+        {
+            removingSessions.push( session );
+        }
+    }
+
+    private void scheduleFlush( SocketSessionImpl session )
+    {
+        synchronized(flushingSessionsSet)
+        {
+            //if flushingSessions grows to contain Integer.MAX_VALUE sessions
+            // then this will fail.
+            if (flushingSessionsSet.add(session))
+            {
+                flushingSessions.offer(session);
+            }
+        }
+    }
+
+    private void scheduleTrafficControl( SocketSessionImpl session )
+    {
+        synchronized( trafficControllingSessions )
+        {
+            trafficControllingSessions.push( session );
+        }
+    }
+
+    private void doAddNewReader() throws InterruptedException
+    {
+        if( newSessions.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            MultiThreadSocketSessionImpl session;
+
+            synchronized( newSessions )
+            {
+                session = (MultiThreadSocketSessionImpl) newSessions.peek();
+            }
+
+            if( session == null )
+            {
+                break;
+            }
+
+            SocketChannel ch = session.getChannel();
+
+
+            try
+            {
+
+                ch.configureBlocking( false );
+                session.setSelectionKey( ch.register( selector,
+                                                      SelectionKey.OP_READ,
+                                                      session ) );
+
+
+                //System.out.println("ReadDebug:"+"Awaiting Registration");
+                session.awaitRegistration();
+                sessionCreated(session);
+            }
+            catch( IOException e )
+            {
+                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+                // and call ConnectFuture.setException().
+                session.getFilterChain().fireExceptionCaught( session, e );
+            }
+        }
+    }
+
+
+    private void doAddNewWrite() throws InterruptedException
+    {
+        if (newSessions.isEmpty())
+        {
+            return;
+        }
+
+        for (; ;)
+        {
+            MultiThreadSocketSessionImpl session;
+
+            synchronized(newSessions)
+            {
+                session = (MultiThreadSocketSessionImpl) newSessions.peek();
+            }
+
+            if (session == null)
+            {
+                break;
+            }
+
+            SocketChannel ch = session.getChannel();
+
+            try
+            {
+                ch.configureBlocking(false);
+                synchronized(flushingSessionsSet)
+                {
+                    flushingSessionsSet.add(session);
+                }
+
+                session.setWriteSelectionKey(ch.register(writeSelector,
+                                                         SelectionKey.OP_WRITE,
+                                                         session));
+
+                //System.out.println("WriteDebug:"+"Awaiting Registration");
+                session.awaitRegistration();
+                sessionCreated(session);
+            }
+            catch (IOException e)
+            {
+
+                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+                // and call ConnectFuture.setException().
+                session.getFilterChain().fireExceptionCaught( session, e );
+            }
+        }
+    }
+
+
+
+    private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
+    {
+        MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+        synchronized(newSessions)
+        {
+            if (!session.created())
+            {
+                _logger.debug("Popping new session");
+                newSessions.pop();
+
+                // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+                // in AbstractIoFilterChain.fireSessionOpened().
+                session.getServiceListeners().fireSessionCreated( session );
+
+                session.doneCreation();
+            }
+        }
+    }
+
+    private void doRemove()
+    {
+        if( removingSessions.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            MultiThreadSocketSessionImpl session;
+
+            synchronized( removingSessions )
+            {
+                session = (MultiThreadSocketSessionImpl) removingSessions.pop();
+            }
+
+            if( session == null )
+            {
+                break;
+            }
+
+            SocketChannel ch = session.getChannel();
+            SelectionKey key = session.getReadSelectionKey();
+            SelectionKey writeKey = session.getWriteSelectionKey();
+
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.close() is called before addSession() is processed)
+            if (key == null || writeKey == null)
+            {
+                scheduleRemove( session );
+                break;
+            }
+            // skip if channel is already closed
+            if (!key.isValid() || !writeKey.isValid())
+            {
+                continue;
+            }
+
+            try
+            {
+                //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
+                synchronized(readLock)
+                {
+                    key.cancel();
+                }
+                synchronized(writeLock)
+                {
+                    writeKey.cancel();
+                }
+                ch.close();
+            }
+            catch( IOException e )
+            {
+                session.getFilterChain().fireExceptionCaught( session, e );
+            }
+            finally
+            {
+                releaseWriteBuffers( session );
+                session.getServiceListeners().fireSessionDestroyed( session );
+            }
+        }
+    }
+
+    private void processRead(Set selectedKeys)
+    {
+        Iterator it = selectedKeys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
+
+            synchronized(readLock)
+            {
+                if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
+                {
+                    read( session );
+                }
+            }
+
+        }
+
+        selectedKeys.clear();
+    }
+
+    private void processWrite(Set selectedKeys)
+    {
+        Iterator it = selectedKeys.iterator();
+
+        while (it.hasNext())
+        {
+            SelectionKey key = (SelectionKey) it.next();
+            SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+
+            synchronized(writeLock)
+            {
+                if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
+                {
+
+                    // Clear OP_WRITE
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+                    synchronized(flushingSessionsSet)
+                    {
+                        flushingSessions.offer(session);
+                    }
+                }
+            }
+        }
+
+        selectedKeys.clear();
+    }
+
+    private void read(SocketSessionImpl session)
+    {
+
+        //if (_loggerWrite.isDebugEnabled())
+        {
+            //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
+        }
+
+        int totalReadBytes = 0;
+
+        for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
+            SocketChannel ch = session.getChannel();
+
+            try
+            {
+                buf.clear();
+
+                int readBytes = 0;
+                int ret;
+
+                try
+                {
+                    while ((ret = ch.read(buf.buf())) > 0)
+                    {
+                        readBytes += ret;
+                        totalReadBytes += ret;
+                    }
+                }
+                finally
+                {
+                    buf.flip();
+                }
+
+
+                if (readBytes > 0)
+                {
+                    session.increaseReadBytes(readBytes);
+
+                    session.getFilterChain().fireMessageReceived(session, buf);
+                    buf = null;
+                }
+
+                if (ret <= 0)
+                {
+                    if (ret == 0)
+                    {
+                        if (readBytes == session.getReadBufferSize())
+                        {
+                            continue;
+                        }
+                    }
+                    else
+                    {
+                        scheduleRemove(session);
+                    }
+
+                    break;
+                }
+            }
+            catch (Throwable e)
+            {
+                if (e instanceof IOException)
+                {
+                    scheduleRemove(session);
+                }
+                session.getFilterChain().fireExceptionCaught(session, e);
+            }
+            finally
+            {
+                if (buf != null)
+                {
+                    buf.release();
+                }
+            }
+        }//for
+
+        // if (_loggerWrite.isDebugEnabled())
+        {
+            //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
+        }
+    }
+
+
+    private void notifyReadIdleness()
+    {
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - lastIdleReadCheckTime) >= 1000)
+        {
+            lastIdleReadCheckTime = currentTime;
+            Set keys = selector.keys();
+            if( keys != null )
+            {
+                for( Iterator it = keys.iterator(); it.hasNext(); )
+                {
+                    SelectionKey key = ( SelectionKey ) it.next();
+                    SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+                    notifyReadIdleness(session, currentTime);
+                }
+            }
+        }
+    }
+
+    private void notifyWriteIdleness()
+    {
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - lastIdleWriteCheckTime) >= 1000)
+        {
+            lastIdleWriteCheckTime = currentTime;
+            Set keys = writeSelector.keys();
+            if (keys != null)
+            {
+                for (Iterator it = keys.iterator(); it.hasNext();)
+                {
+                    SelectionKey key = (SelectionKey) it.next();
+                    SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+                    notifyWriteIdleness(session, currentTime);
+                }
+            }
+        }
+    }
+
+    private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
+    {
+        notifyIdleness0(
+            session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+            IdleStatus.BOTH_IDLE,
+            Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+        notifyIdleness0(
+            session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+            IdleStatus.READER_IDLE,
+            Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+
+        notifyWriteTimeout(session, currentTime, session
+                .getWriteTimeoutInMillis(), session.getLastWriteTime());
+    }
+
+    private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
+    {
+        notifyIdleness0(
+            session, currentTime,
+                session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+                IdleStatus.BOTH_IDLE,
+                Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+        notifyIdleness0(
+                session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+            IdleStatus.WRITER_IDLE,
+            Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+
+        notifyWriteTimeout( session, currentTime, session
+            .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+    }
+
+    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
+                                  long idleTime, IdleStatus status,
+                                  long lastIoTime )
+    {
+        if( idleTime > 0 && lastIoTime != 0
+            && ( currentTime - lastIoTime ) >= idleTime )
+        {
+            session.increaseIdleCount( status );
+            session.getFilterChain().fireSessionIdle( session, status );
+        }
+    }
+
+    private void notifyWriteTimeout( SocketSessionImpl session,
+                                     long currentTime,
+                                     long writeTimeout, long lastIoTime )
+    {
+
+        MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
+        SelectionKey key = sesh.getWriteSelectionKey();
+
+        synchronized(writeLock)
+        {
+        if( writeTimeout > 0
+            && ( currentTime - lastIoTime ) >= writeTimeout
+            && key != null && key.isValid()
+            && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+        {
+            session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+        }
+    }
+    }
+
+    private SocketSessionImpl getNextFlushingSession()
+    {
+        return (SocketSessionImpl) flushingSessions.poll();
+    }
+
+    private void releaseSession(SocketSessionImpl session)
+    {
+        synchronized(session.getWriteRequestQueue())
+        {
+            synchronized(flushingSessionsSet)
+            {
+                if (session.getScheduledWriteRequests() > 0)
+                {
+                    if (_loggerWrite.isDebugEnabled())
+                    {
+                        //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
+                    }
+                    flushingSessions.offer(session);
+                }
+                else
+                {
+                    if (_loggerWrite.isDebugEnabled())
+                    {
+                        //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
+                    }
+                    flushingSessionsSet.remove(session);
+                }
+            }
+        }
+    }
+
+    private void releaseWriteBuffers(SocketSessionImpl session)
+    {
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequest req;
+
+        //Should this be synchronized?
+        synchronized(writeRequestQueue)
+        {
+            while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
+            {
+                try
+                {
+                    ((ByteBuffer) req.getMessage()).release();
+                }
+                catch (IllegalStateException e)
+                {
+                    session.getFilterChain().fireExceptionCaught(session, e);
+                }
+                finally
+                {
+                    req.getFuture().setWritten(false);
+                }
+            }
+        }
+    }
+
+    private void doFlush()
+    {
+        MultiThreadSocketSessionImpl session;
+
+        while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
+        {
+            if( !session.isConnected() )
+            {
+                releaseWriteBuffers( session );
+                releaseSession(session);
+                continue;
+            }
+
+            SelectionKey key = session.getWriteSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.write() is called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleFlush( session );
+                releaseSession(session);
+                continue;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                releaseSession(session);
+                continue;
+            }
+
+            try
+            {
+                if (doFlush(session))
+                {
+                    releaseSession(session);
+                }
+            }
+            catch( IOException e )
+            {
+                releaseSession(session);
+                scheduleRemove( session );
+                session.getFilterChain().fireExceptionCaught( session, e );
+            }
+
+        }
+
+    }
+
+    private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
+    {
+        MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+        // Clear OP_WRITE
+        SelectionKey key = session.getWriteSelectionKey();
+        synchronized(writeLock)
+        {
+            key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+        }
+        SocketChannel ch = session.getChannel();
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+
+        long totalFlushedBytes = 0;
+        for( ; ; )
+        {
+            WriteRequest req;
+
+            synchronized( writeRequestQueue )
+            {
+                req = ( WriteRequest ) writeRequestQueue.first();
+            }
+
+            if( req == null )
+            {
+                break;
+            }
+
+            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+            if( buf.remaining() == 0 )
+            {
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                session.increaseWrittenMessages();
+
+                buf.reset();
+                session.getFilterChain().fireMessageSent( session, req );
+                continue;
+            }
+
+
+            int writtenBytes = 0;
+
+            // Reported as DIRMINA-362
+            //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.            
+//            if (key.isWritable())
+            {
+                try
+                {
+                    writtenBytes = ch.write(buf.buf());
+                    totalFlushedBytes += writtenBytes;
+                }
+                catch (IOException ioe)
+                {
+                    throw ioe;
+                }
+            }
+
+            if( writtenBytes > 0 )
+            {
+                session.increaseWrittenBytes( writtenBytes );
+            }
+
+            if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
+            {
+                // Kernel buffer is full
+                synchronized (writeLock)
+                {
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                }
+                if (_loggerWrite.isDebugEnabled())
+                {
+                    //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+                }
+                return false;
+            }
+        }
+
+        if (_loggerWrite.isDebugEnabled())
+        {
+            //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+        }
+        return true;
+    }
+
+    private void doUpdateTrafficMask()
+    {
+        if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
+        {
+            return;
+        }
+
+        // Synchronize over entire operation as this method should be called
+        // from both read and write thread and we don't want the order of the
+        //  updates to get changed.
+        trafficMaskUpdateLock.lock();
+        try
+        {
+            for (; ;)
+            {
+                MultiThreadSocketSessionImpl session;
+
+                session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
+
+                if (session == null)
+                {
+                    break;
+                }
+
+                SelectionKey key = session.getReadSelectionKey();
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.suspend??() or session.resume??() is
+                // called before addSession() is processed)
+                if (key == null)
+                {
+                    scheduleTrafficControl(session);
+                    break;
+                }
+                // skip if channel is already closed
+                if (!key.isValid())
+                {
+                    continue;
+                }
+
+                // The normal is OP_READ and, if there are write requests in the
+                // session's write queue, set OP_WRITE to trigger flushing.
+
+                //Sset to Read and Write if there is nothing then the cost
+                // is one loop through the flusher.
+                int ops = SelectionKey.OP_READ;
+
+                // Now mask the preferred ops with the mask of the current session
+                int mask = session.getTrafficMask().getInterestOps();
+                synchronized (readLock)
+                {
+                    key.interestOps(ops & mask);
+                }
+                //Change key to the WriteSelection Key
+                key = session.getWriteSelectionKey();
+                if (key != null && key.isValid())
+                {
+                    Queue writeRequestQueue = session.getWriteRequestQueue();
+                    synchronized (writeRequestQueue)
+                    {
+                        if (!writeRequestQueue.isEmpty())
+                        {
+                            ops = SelectionKey.OP_WRITE;
+                            synchronized (writeLock)
+                            {
+                                key.interestOps(ops & mask);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        finally
+        {
+            trafficMaskUpdateLock.unlock();
+        }
+
+    }
+
+    private class WriteWorker implements Runnable
+    {
+
+        public void run()
+        {
+            Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
+
+            //System.out.println("WriteDebug:"+"Startup");
+            for (; ;)
+            {
+                try
+                {
+                    int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
+
+                    doAddNewWrite();
+                    doUpdateTrafficMask();
+
+                    if (nKeys > 0)
+                    {
+                        //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
+                        processWrite(writeSelector.selectedKeys());
+                    }
+                    else
+                    {
+                        //System.out.println("WriteDebug:"+"No keys from writeselector");
+                    }
+
+                    doRemove();
+                    notifyWriteIdleness();
+
+                    if (flushingSessionsSet.size() > 0)
+                    {
+                        doFlush();
+                    }
+
+                    if (writeSelector.keys().isEmpty())
+                    {
+                        synchronized(writeLock)
+                        {
+
+                            if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
+                            {
+                                writeWorker = null;
+                                try
+                                {
+                                    writeSelector.close();
+                                }
+                                catch (IOException e)
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                                }
+                                finally
+                                {
+                                    writeSelector = null;
+                                }
+
+                                break;
+                            }
+                        }
+                    }
+
+                }
+                catch (Throwable t)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(t);
+
+                    try
+                    {
+                        Thread.sleep(1000);
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
+                    }
+                }
+            }
+            //System.out.println("WriteDebug:"+"Shutdown");
+        }
+
+    }
+
+    private class ReadWorker implements Runnable
+    {
+
+        public void run()
+        {
+            Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
+
+            //System.out.println("ReadDebug:"+"Startup");
+            for( ; ; )
+            {
+                try
+                {
+                    int nKeys = selector.select(SELECTOR_TIMEOUT);
+
+                    doAddNewReader();
+                    doUpdateTrafficMask();
+
+                    if( nKeys > 0 )
+                    {
+                        //System.out.println("ReadDebug:"+nKeys + " keys from selector");
+
+                        processRead(selector.selectedKeys());
+                    }
+                    else
+                    {
+                        //System.out.println("ReadDebug:"+"No keys from selector");
+                    }
+
+
+                    doRemove();
+                    notifyReadIdleness();
+
+                    if( selector.keys().isEmpty() )
+                    {
+
+                        synchronized(readLock)
+                        {
+                            if( selector.keys().isEmpty() && newSessions.isEmpty() )
+                            {
+                                readWorker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( Throwable t )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( t );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                    }
+                }
+            }
+            //System.out.println("ReadDebug:"+"Shutdown");
+        }
+
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,240 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * An {@link IoConnectorConfig} for {@link SocketConnector}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl
+{
+    private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+    private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+    private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false;
+    private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false;
+
+    private static boolean DEFAULT_REUSE_ADDRESS;
+    private static int DEFAULT_RECEIVE_BUFFER_SIZE;
+    private static int DEFAULT_SEND_BUFFER_SIZE;
+    private static int DEFAULT_TRAFFIC_CLASS;
+    private static boolean DEFAULT_KEEP_ALIVE;
+    private static boolean DEFAULT_OOB_INLINE;
+    private static int DEFAULT_SO_LINGER;
+    private static boolean DEFAULT_TCP_NO_DELAY;
+
+    static
+    {
+        initialize();
+    }
+
+    private static void initialize()
+    {
+        Socket socket = null;
+
+        socket = new Socket();
+
+        try
+        {
+            DEFAULT_REUSE_ADDRESS = socket.getReuseAddress();
+            DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize();
+            DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize();
+            DEFAULT_KEEP_ALIVE = socket.getKeepAlive();
+            DEFAULT_OOB_INLINE = socket.getOOBInline();
+            DEFAULT_SO_LINGER = socket.getSoLinger();
+            DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay();
+
+            // Check if setReceiveBufferSize is supported.
+            try
+            {
+                socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
+                SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true;
+            }
+            catch( SocketException e )
+            {
+                SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+            }
+
+            // Check if setSendBufferSize is supported.
+            try
+            {
+                socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE);
+                SET_SEND_BUFFER_SIZE_AVAILABLE = true;
+            }
+            catch( SocketException e )
+            {
+                SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+            }
+
+            // Check if getTrafficClass is supported.
+            try
+            {
+                DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass();
+                GET_TRAFFIC_CLASS_AVAILABLE = true;
+            }
+            catch( SocketException e )
+            {
+                GET_TRAFFIC_CLASS_AVAILABLE = false;
+                DEFAULT_TRAFFIC_CLASS = 0;
+            }
+        }
+        catch( SocketException e )
+        {
+            throw new ExceptionInInitializerError(e);
+        }
+        finally
+        {
+            if( socket != null )
+            {
+                try
+                {
+                    socket.close();
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+            }
+        }
+    }
+
+    public static boolean isSetReceiveBufferSizeAvailable() {
+        return SET_RECEIVE_BUFFER_SIZE_AVAILABLE;
+    }
+
+    public static boolean isSetSendBufferSizeAvailable() {
+        return SET_SEND_BUFFER_SIZE_AVAILABLE;
+    }
+
+    public static boolean isGetTrafficClassAvailable() {
+        return GET_TRAFFIC_CLASS_AVAILABLE;
+    }
+
+    public static boolean isSetTrafficClassAvailable() {
+        return SET_TRAFFIC_CLASS_AVAILABLE;
+    }
+
+    private boolean reuseAddress = DEFAULT_REUSE_ADDRESS;
+    private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+    private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+    private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+    private boolean keepAlive = DEFAULT_KEEP_ALIVE;
+    private boolean oobInline = DEFAULT_OOB_INLINE;
+    private int soLinger = DEFAULT_SO_LINGER;
+    private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+    /**
+     * Creates a new instance.
+     */
+    MultiThreadSocketSessionConfigImpl()
+    {
+    }
+
+    public boolean isReuseAddress()
+    {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress( boolean reuseAddress )
+    {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public int getReceiveBufferSize()
+    {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize( int receiveBufferSize )
+    {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public int getSendBufferSize()
+    {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize( int sendBufferSize )
+    {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public int getTrafficClass()
+    {
+        return trafficClass;
+    }
+
+    public void setTrafficClass( int trafficClass )
+    {
+        this.trafficClass = trafficClass;
+    }
+
+    public boolean isKeepAlive()
+    {
+        return keepAlive;
+    }
+
+    public void setKeepAlive( boolean keepAlive )
+    {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isOobInline()
+    {
+        return oobInline;
+    }
+
+    public void setOobInline( boolean oobInline )
+    {
+        this.oobInline = oobInline;
+    }
+
+    public int getSoLinger()
+    {
+        return soLinger;
+    }
+
+    public void setSoLinger( int soLinger )
+    {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isTcpNoDelay()
+    {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay( boolean tcpNoDelay )
+    {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,488 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
+import org.apache.mina.util.Queue;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+class MultiThreadSocketSessionImpl extends SocketSessionImpl
+{
+    private final IoService manager;
+    private final IoServiceConfig serviceConfig;
+    private final SocketSessionConfig config = new SessionConfigImpl();
+    private final MultiThreadSocketIoProcessor ioProcessor;
+    private final MultiThreadSocketFilterChain filterChain;
+    private final SocketChannel ch;
+    private final Queue writeRequestQueue;
+    private final IoHandler handler;
+    private final SocketAddress remoteAddress;
+    private final SocketAddress localAddress;
+    private final SocketAddress serviceAddress;
+    private final IoServiceListenerSupport serviceListeners;
+    private SelectionKey readKey, writeKey;
+    private int readBufferSize;
+    private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
+    private AtomicBoolean created = new AtomicBoolean(false);
+
+    /**
+     * Creates a new instance.
+     */
+    MultiThreadSocketSessionImpl( IoService manager,
+                       SocketIoProcessor ioProcessor,
+                       IoServiceListenerSupport listeners,
+                       IoServiceConfig serviceConfig,
+                       SocketChannel ch,
+                       IoHandler defaultHandler,
+                       SocketAddress serviceAddress )
+    {
+        super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
+        this.manager = manager;
+        this.serviceListeners = listeners;
+        this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
+        this.filterChain = new MultiThreadSocketFilterChain(this);
+        this.ch = ch;
+        this.writeRequestQueue = new Queue();
+        this.handler = defaultHandler;
+        this.remoteAddress = ch.socket().getRemoteSocketAddress();
+        this.localAddress = ch.socket().getLocalSocketAddress();
+        this.serviceAddress = serviceAddress;
+        this.serviceConfig = serviceConfig;
+
+        // Apply the initial session settings
+        IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
+        if( sessionConfig instanceof SocketSessionConfig )
+        {
+            SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
+            this.config.setKeepAlive( cfg.isKeepAlive() );
+            this.config.setOobInline( cfg.isOobInline() );
+            this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
+            this.readBufferSize = cfg.getReceiveBufferSize();
+            this.config.setReuseAddress( cfg.isReuseAddress() );
+            this.config.setSendBufferSize( cfg.getSendBufferSize() );
+            this.config.setSoLinger( cfg.getSoLinger() );
+            this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
+
+            if( this.config.getTrafficClass() != cfg.getTrafficClass() )
+            {
+                this.config.setTrafficClass( cfg.getTrafficClass() );
+            }
+        }
+    }
+
+    void awaitRegistration() throws InterruptedException
+    {
+        registeredReadyLatch.countDown();
+
+        registeredReadyLatch.await();
+    }
+
+    boolean created() throws InterruptedException
+    {
+        return created.get();
+    }
+
+    void doneCreation()
+    {
+        created.getAndSet(true);
+    }
+
+    public IoService getService()
+    {
+        return manager;
+    }
+
+    public IoServiceConfig getServiceConfig()
+    {
+        return serviceConfig;
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return config;
+    }
+
+    SocketIoProcessor getIoProcessor()
+    {
+        return ioProcessor;
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filterChain;
+    }
+
+    SocketChannel getChannel()
+    {
+        return ch;
+    }
+
+    IoServiceListenerSupport getServiceListeners()
+    {
+        return serviceListeners;
+    }
+
+    SelectionKey getSelectionKey()
+    {
+        return readKey;
+    }
+
+    SelectionKey getReadSelectionKey()
+    {
+        return readKey;
+    }
+
+    SelectionKey getWriteSelectionKey()
+    {
+        return writeKey;
+    }
+
+    void setSelectionKey(SelectionKey key)
+    {
+        this.readKey = key;
+    }
+
+    void setWriteSelectionKey(SelectionKey key)
+    {
+        this.writeKey = key;
+    }
+
+    public IoHandler getHandler()
+    {
+        return handler;
+    }
+
+    protected void close0()
+    {
+        filterChain.fireFilterClose( this );
+    }
+
+    Queue getWriteRequestQueue()
+    {
+        return writeRequestQueue;
+    }
+
+    /**
+     @return int Number of write scheduled write requests
+     @deprecated
+     */
+    public int getScheduledWriteMessages()
+    {
+        return getScheduledWriteRequests();
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        synchronized( writeRequestQueue )
+        {
+            return writeRequestQueue.size();
+        }
+    }
+
+    public int getScheduledWriteBytes()
+    {
+        synchronized( writeRequestQueue )
+        {
+            return writeRequestQueue.byteSize();
+        }
+    }
+
+    protected void write0( WriteRequest writeRequest )
+    {
+        filterChain.fireFilterWrite( this, writeRequest );
+    }
+
+    public TransportType getTransportType()
+    {
+        return TransportType.SOCKET;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        //This is what I had previously
+//        return ch.socket().getRemoteSocketAddress();
+        return remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        //This is what I had previously
+//        return ch.socket().getLocalSocketAddress();
+        return localAddress;
+    }
+
+    public SocketAddress getServiceAddress()
+    {
+        return serviceAddress;
+    }
+
+    protected void updateTrafficMask()
+    {
+        this.ioProcessor.updateTrafficMask( this );
+    }
+
+    int getReadBufferSize()
+    {
+        return readBufferSize;
+    }
+
+    private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+    {
+        public boolean isKeepAlive()
+        {
+            try
+            {
+                return ch.socket().getKeepAlive();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setKeepAlive( boolean on )
+        {
+            try
+            {
+                ch.socket().setKeepAlive( on );
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public boolean isOobInline()
+        {
+            try
+            {
+                return ch.socket().getOOBInline();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setOobInline( boolean on )
+        {
+            try
+            {
+                ch.socket().setOOBInline( on );
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public boolean isReuseAddress()
+        {
+            try
+            {
+                return ch.socket().getReuseAddress();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setReuseAddress( boolean on )
+        {
+            try
+            {
+                ch.socket().setReuseAddress( on );
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public int getSoLinger()
+        {
+            try
+            {
+                return ch.socket().getSoLinger();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setSoLinger( int linger )
+        {
+            try
+            {
+                if( linger < 0 )
+                {
+                    ch.socket().setSoLinger( false, 0 );
+                }
+                else
+                {
+                    ch.socket().setSoLinger( true, linger );
+                }
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public boolean isTcpNoDelay()
+        {
+            try
+            {
+                return ch.socket().getTcpNoDelay();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setTcpNoDelay( boolean on )
+        {
+            try
+            {
+                ch.socket().setTcpNoDelay( on );
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public int getTrafficClass()
+        {
+            if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+            {
+                try
+                {
+                    return ch.socket().getTrafficClass();
+                }
+                catch( SocketException e )
+                {
+                    // Throw an exception only when setTrafficClass is also available.
+                    if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+                    {
+                        throw new RuntimeIOException( e );
+                    }
+                }
+            }
+
+            return 0;
+        }
+
+        public void setTrafficClass( int tc )
+        {
+            if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+            {
+                try
+                {
+                    ch.socket().setTrafficClass( tc );
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
+        }
+
+        public int getSendBufferSize()
+        {
+            try
+            {
+                return ch.socket().getSendBufferSize();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setSendBufferSize( int size )
+        {
+            if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+            {
+                try
+                {
+                    ch.socket().setSendBufferSize( size );
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
+        }
+
+        public int getReceiveBufferSize()
+        {
+            try
+            {
+                return ch.socket().getReceiveBufferSize();
+            }
+            catch( SocketException e )
+            {
+                throw new RuntimeIOException( e );
+            }
+        }
+
+        public void setReceiveBufferSize( int size )
+        {
+            if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+            {
+                try
+                {
+                    ch.socket().setReceiveBufferSize( size );
+                    MultiThreadSocketSessionImpl.this.readBufferSize = size;
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date