You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/11/05 20:36:43 UTC

svn commit: r471502 [2/2] - in /directory/branches/mina/1.2: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/common/support/ core/src/main/java/org/apache/mina/filter/ core/src/main/java/org/apache/mina/filter/codec/ core/...

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Sun Nov  5 11:36:41 2006
@@ -26,17 +26,21 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
 import org.apache.mina.common.support.BaseIoConnector;
 import org.apache.mina.common.support.DefaultConnectFuture;
@@ -44,9 +48,6 @@
 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
 import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
-
-import java.util.concurrent.Executor;
 
 /**
  * {@link IoConnector} for datagram transport (UDP/IP).
@@ -56,17 +57,18 @@
  */
 public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramService
 {
-    private static volatile int nextId = 0;
+    private static final AtomicInteger nextId = new AtomicInteger( );
 
+    private final Object lock = new Object();
     private final IoConnector wrapper;
     private final Executor executor;
-    private final int id = nextId ++ ;
+    private final int id = nextId.getAndIncrement();
     private Selector selector;
     private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
-    private final Queue registerQueue = new Queue();
-    private final Queue cancelQueue = new Queue();
-    private final Queue flushingSessions = new Queue();
-    private final Queue trafficControllingSessions = new Queue();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+    private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
     private Worker worker;
 
     /**
@@ -93,12 +95,12 @@
 
         if( !( address instanceof InetSocketAddress ) )
             throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
+                + address.getClass() );
 
         if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
         {
             throw new IllegalArgumentException( "Unexpected local address type: "
-                                                + localAddress.getClass() );
+                + localAddress.getClass() );
         }
 
         if( config == null )
@@ -160,33 +162,27 @@
         }
 
         RegistrationRequest request = new RegistrationRequest( ch, handler, config );
-        synchronized( this )
+        try
+        {
+            startupWorker();
+        }
+        catch( IOException e )
         {
             try
             {
-                startupWorker();
+                ch.disconnect();
+                ch.close();
             }
-            catch( IOException e )
+            catch( IOException e2 )
             {
-                try
-                {
-                    ch.disconnect();
-                    ch.close();
-                }
-                catch( IOException e2 )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( e2 );
-                }
-
-                return DefaultConnectFuture.newFailedFuture( e );
+                ExceptionMonitor.getInstance().exceptionCaught( e2 );
             }
 
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
+            return DefaultConnectFuture.newFailedFuture( e );
         }
 
+        registerQueue.add( request );
+
         selector.wakeup();
         return request;
     }
@@ -211,40 +207,37 @@
         this.defaultConfig = defaultConfig;
     }
 
-    private synchronized void startupWorker() throws IOException
+    private void startupWorker() throws IOException
     {
-        if( worker == null )
+        synchronized( lock )
         {
-            selector = Selector.open();
-            worker = new Worker();
-            executor.execute( new NamePreservingRunnable( worker ) );
+            if( worker == null )
+            {
+                selector = Selector.open();
+                worker = new Worker();
+                executor.execute( new NamePreservingRunnable( worker ) );
+            }
         }
     }
 
     public void closeSession( DatagramSessionImpl session )
     {
-        synchronized( this )
+        try
         {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply return
-                // silently here because it we can simply conclude that
-                // this session is not managed by this connector or
-                // already closed.
-                return;
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( session );
-            }
+            startupWorker();
+        }
+        catch( IOException e )
+        {
+            // IOException is thrown only when Worker thread is not
+            // running and failed to open a selector.  We simply return
+            // silently here because it we can simply conclude that
+            // this session is not managed by this connector or
+            // already closed.
+            return;
         }
 
+        cancelQueue.add( session );
+
         selector.wakeup();
     }
 
@@ -260,10 +253,7 @@
 
     private void scheduleFlush( DatagramSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
+        flushingSessions.add( session );
     }
 
     public void updateTrafficMask( DatagramSessionImpl session )
@@ -274,15 +264,11 @@
         {
             selector.wakeup();
         }
-        selector.wakeup();
     }
 
     private void scheduleTrafficControl( DatagramSessionImpl session )
     {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.push( session );
-        }
+        trafficControllingSessions.add( session );
     }
 
     private void doUpdateTrafficMask()
@@ -290,14 +276,9 @@
         if( trafficControllingSessions.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( trafficControllingSessions )
-            {
-                session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
-            }
+            DatagramSessionImpl session = trafficControllingSessions.poll();
 
             if( session == null )
                 break;
@@ -320,13 +301,9 @@
             // The normal is OP_READ and, if there are write requests in the
             // session's write queue, set OP_WRITE to trigger flushing.
             int ops = SelectionKey.OP_READ;
-            Queue writeRequestQueue = session.getWriteRequestQueue();
-            synchronized( writeRequestQueue )
+            if( !session.getWriteRequestQueue().isEmpty() )
             {
-                if( !writeRequestQueue.isEmpty() )
-                {
-                    ops |= SelectionKey.OP_WRITE;
-                }
+                ops |= SelectionKey.OP_WRITE;
             }
 
             // Now mask the preferred ops with the mask of the current session
@@ -341,7 +318,7 @@
         {
             Thread.currentThread().setName( "DatagramConnector-" + id );
 
-            for( ;; )
+            for( ; ; )
             {
                 try
                 {
@@ -360,7 +337,7 @@
 
                     if( selector.keys().isEmpty() )
                     {
-                        synchronized( DatagramConnectorDelegate.this )
+                        synchronized( lock )
                         {
                             if( selector.keys().isEmpty() &&
                                 registerQueue.isEmpty() &&
@@ -386,7 +363,7 @@
                 }
                 catch( IOException e )
                 {
-                    ExceptionMonitor.getInstance().exceptionCaught(  e );
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
 
                     try
                     {
@@ -394,25 +371,26 @@
                     }
                     catch( InterruptedException e1 )
                     {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
                     }
                 }
             }
         }
     }
 
-    private void processReadySessions( Set keys )
+    private void processReadySessions( Set<SelectionKey> keys )
     {
-        Iterator it = keys.iterator();
+        Iterator<SelectionKey> it = keys.iterator();
         while( it.hasNext() )
         {
-            SelectionKey key = ( SelectionKey ) it.next();
+            SelectionKey key = it.next();
             it.remove();
 
             DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
 
-            DatagramSessionImpl replaceSession = getRecycledSession(session);
+            DatagramSessionImpl replaceSession = getRecycledSession( session );
 
-            if(replaceSession != null)
+            if( replaceSession != null )
             {
                 session = replaceSession;
             }
@@ -432,16 +410,16 @@
     private DatagramSessionImpl getRecycledSession( IoSession session )
     {
         IoSessionRecycler sessionRecycler = getSessionRecycler( session );
-        DatagramSessionImpl replaceSession = null;
 
-        if ( sessionRecycler != null )
+        if( sessionRecycler != null )
         {
-            synchronized ( sessionRecycler )
+            synchronized( sessionRecycler )
             {
-                replaceSession = ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(), session
-                        .getRemoteAddress() );
+                DatagramSessionImpl replaceSession =
+                    ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(),
+                        session.getRemoteAddress() );
 
-                if ( replaceSession != null )
+                if( replaceSession != null )
                 {
                     return replaceSession;
                 }
@@ -501,14 +479,9 @@
         if( flushingSessions.size() == 0 )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( DatagramSessionImpl ) flushingSessions.pop();
-            }
+            DatagramSessionImpl session = flushingSessions.poll();
 
             if( session == null )
                 break;
@@ -528,15 +501,11 @@
     {
         DatagramChannel ch = session.getChannel();
 
-        Queue writeRequestQueue = session.getWriteRequestQueue();
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for( ;; )
+        for( ; ; )
         {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
+            WriteRequest req = writeRequestQueue.peek();
 
             if( req == null )
                 break;
@@ -545,10 +514,7 @@
             if( buf.remaining() == 0 )
             {
                 // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
+                writeRequestQueue.poll();
 
                 session.increaseWrittenMessages();
                 buf.reset();
@@ -577,13 +543,10 @@
             else if( writtenBytes > 0 )
             {
                 key.interestOps( key.interestOps()
-                                 & ( ~SelectionKey.OP_WRITE ) );
+                    & ( ~SelectionKey.OP_WRITE ) );
 
                 // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
+                writeRequestQueue.poll();
 
                 session.increaseWrittenBytes( writtenBytes );
                 session.increaseWrittenMessages();
@@ -598,22 +561,18 @@
         if( registerQueue.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
+            RegistrationRequest req = registerQueue.poll();
 
             if( req == null )
                 break;
 
             DatagramSessionImpl session = new DatagramSessionImpl(
-                    wrapper, this,
-                    req.config,
-                    req.channel, req.handler,
-                    req.channel.socket().getRemoteSocketAddress() );
+                wrapper, this,
+                req.config,
+                req.channel, req.handler,
+                req.channel.socket().getRemoteSocketAddress() );
 
             // AbstractIoFilterChain will notify the connect future.
             session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, req );
@@ -623,14 +582,14 @@
             {
                 DatagramSessionImpl replaceSession = getRecycledSession( session );
 
-                if ( replaceSession != null )
+                if( replaceSession != null )
                 {
                     session = replaceSession;
                 }
                 else
                 {
                     SelectionKey key = req.channel.register( selector,
-                            SelectionKey.OP_READ, session );
+                        SelectionKey.OP_READ, session );
 
                     session.setSelectionKey( key );
                     buildFilterChain( req, session );
@@ -653,7 +612,7 @@
                         req.channel.disconnect();
                         req.channel.close();
                     }
-                    catch (IOException e)
+                    catch( IOException e )
                     {
                         ExceptionMonitor.getInstance().exceptionCaught( e );
                     }
@@ -674,13 +633,9 @@
         if( cancelQueue.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            DatagramSessionImpl session;
-            synchronized( cancelQueue )
-            {
-                session = ( DatagramSessionImpl ) cancelQueue.pop();
-            }
+            DatagramSessionImpl session = cancelQueue.poll();
 
             if( session == null )
                 break;

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Sun Nov  5 11:36:41 2006
@@ -6,49 +6,52 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio.support;
 
+import java.util.Queue;
+
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
 
 /**
  * An {@link IoFilterChain} for datagram transport (UDP/IP).
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  */
-class DatagramFilterChain extends AbstractIoFilterChain {
+class DatagramFilterChain extends AbstractIoFilterChain
+{
 
     DatagramFilterChain( IoSession parent )
     {
         super( parent );
     }
-    
+
+    @Override
     protected void doWrite( IoSession session, WriteRequest writeRequest )
     {
         DatagramSessionImpl s = ( DatagramSessionImpl ) session;
-        Queue writeRequestQueue = s.getWriteRequestQueue();
-        
+        Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
+
         // SocketIoProcessor.doFlush() will reset it after write is finished
-        // because the buffer will be passed with messageSent event. 
+        // because the buffer will be passed with messageSent event.
         ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
         synchronized( writeRequestQueue )
         {
-            writeRequestQueue.push( writeRequest );
+            writeRequestQueue.add( writeRequest );
             if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
             {
                 // Notify DatagramService only when writeRequestQueue was empty.
@@ -57,13 +60,14 @@
         }
     }
 
+    @Override
     protected void doClose( IoSession session )
     {
         DatagramSessionImpl s = ( DatagramSessionImpl ) session;
         DatagramService manager = s.getManagerDelegate();
         if( manager instanceof DatagramConnectorDelegate )
         {
-            ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
+            manager.closeSession( s );
         }
         else
         {

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio.support;
 
@@ -23,8 +23,12 @@
 import java.net.SocketException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.mina.common.BroadcastIoSession;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -34,15 +38,13 @@
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.WriteFuture;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.util.Queue;
 
 /**
  * An {@link IoSession} for datagram transport (UDP/IP).
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -54,7 +56,7 @@
     private final DatagramService managerDelegate;
     private final DatagramFilterChain filterChain;
     private final DatagramChannel ch;
-    private final Queue writeRequestQueue;
+    private final Queue<WriteRequest> writeRequestQueue;
     private final IoHandler handler;
     private final SocketAddress localAddress;
     private final SocketAddress serviceAddress;
@@ -75,7 +77,7 @@
         this.managerDelegate = managerDelegate;
         this.filterChain = new DatagramFilterChain( this );
         this.ch = ch;
-        this.writeRequestQueue = new Queue();
+        this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>( );
         this.handler = defaultHandler;
         this.remoteAddress = ch.socket().getRemoteSocketAddress();
 
@@ -147,7 +149,8 @@
     {
         return handler;
     }
-    
+
+    @Override
     protected void close0()
     {
         IoServiceConfig config = getServiceConfig();
@@ -158,21 +161,23 @@
         filterChain.fireFilterClose( this );
     }
 
-    Queue getWriteRequestQueue()
+    Queue<WriteRequest> getWriteRequestQueue()
     {
         return writeRequestQueue;
     }
-    
+
+    @Override
     public WriteFuture write( Object message, SocketAddress destination )
     {
         if( !this.config.isBroadcast() )
         {
             throw new IllegalStateException( "Non-broadcast session" );
         }
-        
+
         return super.write( message, destination );
     }
 
+    @Override
     protected void write0( WriteRequest writeRequest )
     {
         filterChain.fireFilterWrite( this, writeRequest );
@@ -180,18 +185,19 @@
 
     public int getScheduledWriteRequests()
     {
-        synchronized( writeRequestQueue )
-        {
-            return writeRequestQueue.size();
-        }
+        return writeRequestQueue.size();
     }
 
     public int getScheduledWriteBytes()
     {
-        synchronized( writeRequestQueue )
+        int byteSize = 0;
+
+        for( WriteRequest request : writeRequestQueue )
         {
-            return writeRequestQueue.byteSize();
+            byteSize += ( ( ByteBuffer ) request.getMessage() ).remaining();
         }
+
+        return byteSize;
     }
 
     public TransportType getTransportType()
@@ -219,6 +225,7 @@
         return serviceAddress;
     }
 
+    @Override
     protected void updateTrafficMask()
     {
         managerDelegate.updateTrafficMask( this );
@@ -229,8 +236,10 @@
         return readBufferSize;
     }
 
+    @SuppressWarnings({"CloneableClassWithoutClone"})
     private class SessionConfigImpl extends DatagramSessionConfigImpl implements DatagramSessionConfig
     {
+        @Override
         public int getReceiveBufferSize()
         {
             try
@@ -243,6 +252,7 @@
             }
         }
 
+        @Override
         public void setReceiveBufferSize( int receiveBufferSize )
         {
             if( DatagramSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
@@ -259,6 +269,7 @@
             }
         }
 
+        @Override
         public boolean isBroadcast()
         {
             try
@@ -271,6 +282,7 @@
             }
         }
 
+        @Override
         public void setBroadcast( boolean broadcast )
         {
             try
@@ -283,6 +295,7 @@
             }
         }
 
+        @Override
         public int getSendBufferSize()
         {
             try
@@ -295,6 +308,7 @@
             }
         }
 
+        @Override
         public void setSendBufferSize( int sendBufferSize )
         {
             if( DatagramSessionConfigImpl.isSetSendBufferSizeAvailable() )
@@ -310,6 +324,7 @@
             }
         }
 
+        @Override
         public boolean isReuseAddress()
         {
             try
@@ -322,6 +337,7 @@
             }
         }
 
+        @Override
         public void setReuseAddress( boolean reuseAddress )
         {
             try
@@ -334,6 +350,7 @@
             }
         }
 
+        @Override
         public int getTrafficClass()
         {
             if( DatagramSessionConfigImpl.isGetTrafficClassAvailable() )
@@ -353,6 +370,7 @@
             }
         }
 
+        @Override
         public void setTrafficClass( int trafficClass )
         {
             if( DatagramSessionConfigImpl.isSetTrafficClassAvailable() )

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe.support;
 
@@ -35,6 +35,7 @@
         super( session );
     }
 
+    @Override
     public void fireMessageReceived( IoSession session, Object message )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -42,10 +43,7 @@
         {
             if( !s.getTrafficMask().isReadable() )
             {
-                synchronized( s.pendingDataQueue )
-                {
-                    s.pendingDataQueue.push( message );
-                }
+                s.pendingDataQueue.add( message );
             }
             else
             {
@@ -54,14 +52,15 @@
                 {
                     byteCount = ( ( ByteBuffer ) message ).remaining();
                 }
-                
+
                 s.increaseReadBytes( byteCount );
-                
+
                 super.fireMessageReceived( s, message );
             }
         }
     }
 
+    @Override
     protected void doWrite( IoSession session, WriteRequest writeRequest )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -69,19 +68,15 @@
         {
             if( s.isConnected() )
             {
-                
                 if( !s.getTrafficMask().isWritable() )
                 {
-                    synchronized( s.pendingDataQueue )
-                    {
-                        s.pendingDataQueue.push( writeRequest );
-                    }
+                    s.pendingDataQueue.add( writeRequest );
                 }
                 else
                 {
-                
+
                     Object message = writeRequest.getMessage();
-                    
+
                     int byteCount = 1;
                     Object messageCopy = message;
                     if( message instanceof ByteBuffer )
@@ -95,22 +90,23 @@
                         rb.reset();
                         messageCopy = wb;
                     }
-                    
+
                     s.increaseWrittenBytes( byteCount );
                     s.increaseWrittenMessages();
-    
+
                     s.getFilterChain().fireMessageSent( s, writeRequest );
                     s.getRemoteSession().getFilterChain()
                                 .fireMessageReceived( s.getRemoteSession(), messageCopy );
                 }
             }
-            else 
+            else
             {
                 writeRequest.getFuture().setWritten( false );
             }
         }
     }
 
+    @Override
     protected void doClose( IoSession session )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -123,5 +119,5 @@
             }
         }
     }
-    
+
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Sun Nov  5 11:36:41 2006
@@ -6,21 +6,26 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe.support;
 
 import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -28,22 +33,22 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.common.support.BaseIoSessionConfig;
 import org.apache.mina.common.support.IoServiceListenerSupport;
-import org.apache.mina.util.Queue;
 
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
 public class VmPipeSessionImpl extends BaseIoSession
 {
-    private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
-    
+    private static final IoSessionConfig CONFIG = new BaseIoSessionConfig()
+    {
+    };
+
     private final IoService service;
     private final IoServiceConfig serviceConfig;
     private final IoServiceListenerSupport serviceListeners;
@@ -54,15 +59,15 @@
     private final VmPipeFilterChain filterChain;
     private final VmPipeSessionImpl remoteSession;
     final Object lock;
-    final Queue pendingDataQueue;
+    final BlockingQueue<Object> pendingDataQueue;
 
     /**
      * Constructor for client-side session.
      */
     public VmPipeSessionImpl(
-            IoService service, IoServiceConfig serviceConfig,
-            IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
-            IoHandler handler, VmPipe remoteEntry )
+        IoService service, IoServiceConfig serviceConfig,
+        IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
+        IoHandler handler, VmPipe remoteEntry )
     {
         this.service = service;
         this.serviceConfig = serviceConfig;
@@ -72,7 +77,7 @@
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
         this.handler = handler;
         this.filterChain = new VmPipeFilterChain( this );
-        this.pendingDataQueue = new Queue();
+        this.pendingDataQueue = new LinkedBlockingQueue<Object>();
 
         remoteSession = new VmPipeSessionImpl( this, remoteEntry );
     }
@@ -91,24 +96,24 @@
         this.handler = entry.getHandler();
         this.filterChain = new VmPipeFilterChain( this );
         this.remoteSession = remoteSession;
-        this.pendingDataQueue = new Queue();
+        this.pendingDataQueue = new LinkedBlockingQueue<Object>();
     }
-    
+
     public IoService getService()
     {
         return service;
     }
-    
+
     IoServiceListenerSupport getServiceListeners()
     {
         return serviceListeners;
     }
-    
+
     public IoServiceConfig getServiceConfig()
     {
         return serviceConfig;
     }
-    
+
     public IoSessionConfig getConfig()
     {
         return CONFIG;
@@ -118,7 +123,7 @@
     {
         return filterChain;
     }
-    
+
     public VmPipeSessionImpl getRemoteSession()
     {
         return remoteSession;
@@ -129,11 +134,13 @@
         return handler;
     }
 
+    @Override
     protected void close0()
     {
         filterChain.fireFilterClose( this );
     }
-    
+
+    @Override
     protected void write0( WriteRequest writeRequest )
     {
         this.filterChain.fireFilterWrite( this, writeRequest );
@@ -148,7 +155,7 @@
     {
         return 0;
     }
-    
+
     public TransportType getTransportType()
     {
         return TransportType.VM_PIPE;
@@ -163,33 +170,31 @@
     {
         return localAddress;
     }
-    
+
     public SocketAddress getServiceAddress()
     {
         return serviceAddress;
     }
 
+    @Override
     protected void updateTrafficMask()
     {
-        if( getTrafficMask().isReadable() || getTrafficMask().isWritable())
+        if( getTrafficMask().isReadable() || getTrafficMask().isWritable() )
         {
-            Object[] data;
-            synchronized( pendingDataQueue )
-            {
-                data = pendingDataQueue.toArray();
-                pendingDataQueue.clear();
-            }
-            
-            for( int i = 0; i < data.length; i++ )
+            List<Object> data = new ArrayList<Object>();
+
+            pendingDataQueue.drainTo( data );
+
+            for( Object aData : data )
             {
-                if( data[ i ] instanceof WriteRequest )
+                if( aData instanceof WriteRequest )
                 {
-                    WriteRequest wr = ( WriteRequest ) data[ i ];
+                    WriteRequest wr = ( WriteRequest ) aData;
                     filterChain.doWrite( this, wr );
                 }
                 else
                 {
-                    filterChain.fireMessageReceived( this, data[ i ] );
+                    filterChain.fireMessageReceived( this, aData );
                 }
             }
         }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java Sun Nov  5 11:36:41 2006
@@ -20,10 +20,8 @@
 package org.apache.mina.util;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -125,7 +123,8 @@
         delegate.clear();
     }
 
-    public int hashCode()
+    @Override
+	public int hashCode()
     {
         return delegate.hashCode();
     }
@@ -135,7 +134,8 @@
         return delegate.keySet();
     }
 
-    public boolean equals( Object obj )
+    @Override
+	public boolean equals( Object obj )
     {
         return delegate.equals( obj );
     }
@@ -144,18 +144,13 @@
     {
         synchronized( inMap )
         {
-            Iterator inMapKeysIt = inMap.keySet().iterator();
-
-            while( inMapKeysIt.hasNext() )
-            {
-                Object key = inMapKeysIt.next();
-                Object value = inMap.get( key );
+			for ( Object key : inMap.keySet() ) {
+				Object value = inMap.get( key );
 
-                if( value instanceof ExpiringObject )
-                {
-                    delegate.put( key, value );
-                }
-            }
+				if ( value instanceof ExpiringObject ) {
+					delegate.put( key, value );
+				}
+			}
         }
     }
 
@@ -214,7 +209,7 @@
 
         private ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
 
-        public ExpiringObject( Object key, Object value, long lastAccessTime )
+        ExpiringObject( Object key, Object value, long lastAccessTime )
         {
             if( value == null )
             {
@@ -264,12 +259,14 @@
             return value;
         }
 
-        public boolean equals( Object obj )
+        @Override
+		public boolean equals( Object obj )
         {
             return value.equals( obj );
         }
 
-        public int hashCode()
+        @Override
+		public int hashCode()
         {
             return value.hashCode();
         }
@@ -305,7 +302,9 @@
                 }
                 catch( InterruptedException e )
                 {
-                }
+					//Abort on interruption
+					return;
+				}
             }
         }
 
@@ -313,31 +312,24 @@
         {
             long timeNow = System.currentTimeMillis();
 
-            Iterator expiringObjectsIterator = delegate.values().iterator();
+			for ( Object o : delegate.values() ) {
+				ExpiringObject expObject = (ExpiringObject) o;
 
-            while( expiringObjectsIterator.hasNext() )
-            {
-                ExpiringObject expObject = ( ExpiringObject ) expiringObjectsIterator.next();
-
-                if( timeToLiveMillis <= 0 )
-                    continue;
-
-                long timeIdle = timeNow - expObject.getLastAccessTime();
+				if ( timeToLiveMillis <= 0 )
+					continue;
 
-                if( timeIdle >= timeToLiveMillis )
-                {
-                    delegate.remove( expObject.getKey() );
+				long timeIdle = timeNow - expObject.getLastAccessTime();
 
-                    Iterator listenerIterator = expirationListeners.iterator();
+				if ( timeIdle >= timeToLiveMillis ) {
+					delegate.remove( expObject.getKey() );
 
-                    while( listenerIterator.hasNext() )
-                    {
-                        ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
+					for ( Object expirationListener : expirationListeners ) {
+						ExpirationListener listener = (ExpirationListener) expirationListener;
 
-                        listener.expired( expObject.getValue() );
-                    }
-                }
-            }
+						listener.expired( expObject.getValue() );
+					}
+				}
+			}
         }
 
         public void startExpiring()

Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter;
 
@@ -25,32 +25,32 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.security.MessageDigest;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.Random;
 
 import junit.framework.TestCase;
-
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
-import org.apache.mina.common.IoFilter.NextFilter;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.DefaultWriteFuture;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.util.AvailablePortFinder;
-import org.apache.mina.util.Queue;
 import org.easymock.AbstractMatcher;
 import org.easymock.MockControl;
 
 /**
  * Tests {@link StreamWriteFilter}.
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -60,6 +60,7 @@
     IoSession session;
     NextFilter nextFilter;
 
+    @Override
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -71,7 +72,7 @@
         mockNextFilter = MockControl.createControl( NextFilter.class );
         session = ( IoSession ) mockSession.getMock();
         nextFilter = ( NextFilter ) mockNextFilter.getMock();
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( null );
     }
@@ -79,29 +80,29 @@
     public void testWriteEmptyStream() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
-        
+
         InputStream stream = new ByteArrayInputStream( new byte[ 0 ] );
         WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-        
+
         /*
          * Record expectations
          */
         nextFilter.messageSent( session, stream );
-        
+
         /*
          * Replay.
          */
         mockNextFilter.replay();
         mockSession.replay();
-        
+
         filter.filterWrite( nextFilter, session, writeRequest );
-        
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
-        
+
         assertTrue( writeRequest.getFuture().isWritten() );
     }
 
@@ -112,10 +113,10 @@
     public void testWriteNonStreamMessage() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
-        
+
         Object message = new Object();
         WriteRequest writeRequest = new WriteRequest( message, new DummyWriteFuture() );
-        
+
         /*
          * Record expectations
          */
@@ -123,35 +124,35 @@
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( null );
         nextFilter.messageSent( session, message );
-        
+
         /*
          * Replay.
          */
         mockNextFilter.replay();
         mockSession.replay();
-        
+
         filter.filterWrite( nextFilter, session, writeRequest );
         filter.messageSent( nextFilter, session, message );
-        
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
     }
-    
+
     /**
      * Tests when the contents of the stream fits into one write buffer.
      */
     public void testWriteSingleBufferStream() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
-        
+
         byte[] data = new byte[] { 1, 2, 3, 4 };
-        
+
         InputStream stream = new ByteArrayInputStream( data );
         WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-        
+
         /*
          * Record expectations
          */
@@ -161,7 +162,7 @@
         mockSession.setReturnValue(null);
         nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( data ) ) );
         mockNextFilter.setMatcher( new WriteRequestMatcher() );
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
         session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -171,25 +172,25 @@
         session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
         mockSession.setReturnValue( null );
         nextFilter.messageSent( session, stream );
-        
+
         /*
          * Replay.
          */
         mockNextFilter.replay();
         mockSession.replay();
-        
+
         filter.filterWrite( nextFilter, session, writeRequest );
         filter.messageSent( nextFilter, session, data );
-        
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
-        
+
         assertTrue( writeRequest.getFuture().isWritten() );
     }
-    
+
     /**
      * Tests when the contents of the stream doesn't fit into one write buffer.
      */
@@ -197,15 +198,15 @@
     {
         StreamWriteFilter filter = new StreamWriteFilter();
         filter.setWriteBufferSize( 4 );
-        
+
         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
         byte[] chunk3 = new byte[] { 9, 10 };
-        
+
         InputStream stream = new ByteArrayInputStream( data );
         WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-        
+
         /*
          * Record expectations
          */
@@ -215,15 +216,15 @@
         mockSession.setReturnValue(null);
         nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk1 ) ) );
         mockNextFilter.setMatcher( new WriteRequestMatcher() );
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
         nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk2 ) ) );
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
         nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk3 ) ) );
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
         session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -233,34 +234,34 @@
         session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
         mockSession.setReturnValue( null );
         nextFilter.messageSent( session, stream );
-        
+
         /*
          * Replay.
          */
         mockNextFilter.replay();
         mockSession.replay();
-        
+
         filter.filterWrite( nextFilter, session, writeRequest );
         filter.messageSent( nextFilter, session, chunk1 );
         filter.messageSent( nextFilter, session, chunk2 );
         filter.messageSent( nextFilter, session, chunk3 );
-        
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
-        
+
         assertTrue( writeRequest.getFuture().isWritten() );
     }
-    
+
     public void testWriteWhileWriteInProgress() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
-        
-        Queue queue = new Queue();
+
+        Queue<? extends Object> queue = new LinkedList<Object>( );
         InputStream stream = new ByteArrayInputStream( new byte[ 5 ] );
-        
+
         /*
          * Record expectations
          */
@@ -269,7 +270,7 @@
         mockSession.setReturnValue( stream );
         session.getAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
         mockSession.setReturnValue( queue );
-        
+
         /*
          * Replay.
          */
@@ -279,35 +280,35 @@
         WriteRequest wr = new WriteRequest( new Object(), new DummyWriteFuture() );
         filter.filterWrite( nextFilter, session, wr );
         assertEquals( 1, queue.size() );
-        assertSame( wr, queue.pop() );
-        
+        assertSame( wr, queue.poll() );
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
     }
-    
+
     public void testWritesWriteRequestQueueWhenFinished() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
 
-        WriteRequest wrs[] = new WriteRequest[] { 
+        WriteRequest wrs[] = new WriteRequest[] {
                 new WriteRequest( new Object(), new DummyWriteFuture() ),
                 new WriteRequest( new Object(), new DummyWriteFuture() ),
                 new WriteRequest( new Object(), new DummyWriteFuture() )
         };
-        Queue queue = new Queue();
-        queue.push( wrs[ 0 ] );
-        queue.push( wrs[ 1 ] );
-        queue.push( wrs[ 2 ] );
+        Queue<WriteRequest> queue = new LinkedList<WriteRequest>( );
+        queue.add( wrs[ 0 ] );
+        queue.add( wrs[ 1 ] );
+        queue.add( wrs[ 2 ] );
         InputStream stream = new ByteArrayInputStream( new byte[ 0 ] );
-        
+
         /*
          * Record expectations
          */
         mockSession.reset();
-        
+
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
         session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -316,7 +317,7 @@
         mockSession.setReturnValue( new DefaultWriteFuture( session ) );
         session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
         mockSession.setReturnValue( queue );
-        
+
         nextFilter.filterWrite( session, wrs[ 0 ] );
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( null );
@@ -326,9 +327,9 @@
         nextFilter.filterWrite( session, wrs[ 2 ] );
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( null );
-        
+
         nextFilter.messageSent( session, stream );
-        
+
         /*
          * Replay.
          */
@@ -337,14 +338,14 @@
 
         filter.messageSent( nextFilter, session, new Object() );
         assertEquals( 0, queue.size() );
-        
+
         /*
          * Verify.
          */
         mockNextFilter.verify();
         mockSession.verify();
-    }    
-    
+    }
+
     /**
      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
      * specified size.
@@ -352,7 +353,7 @@
     public void testSetWriteBufferSize() throws Exception
     {
         StreamWriteFilter filter = new StreamWriteFilter();
-        
+
         try
         {
             filter.setWriteBufferSize( 0 );
@@ -361,7 +362,7 @@
         catch ( IllegalArgumentException iae )
         {
         }
-        
+
         try
         {
             filter.setWriteBufferSize( -100 );
@@ -376,7 +377,7 @@
         filter.setWriteBufferSize( 1024 );
         assertEquals( 1024, filter.getWriteBufferSize() );
     }
-    
+
     public void testWriteUsingSocketTransport() throws Exception
     {
         IoAcceptor acceptor = new SocketAcceptor();
@@ -384,27 +385,27 @@
         SocketAddress address = new InetSocketAddress( "localhost", AvailablePortFinder.getNextAvailable() );
 
         IoConnector connector = new SocketConnector();
-        
+
         FixedRandomInputStream stream = new FixedRandomInputStream( 4 * 1024 * 1024 );
-        
+
         SenderHandler sender = new SenderHandler( stream );
         ReceiverHandler receiver = new ReceiverHandler( stream.size );
-        
+
         acceptor.bind( address, sender );
-        
+
         synchronized( sender.lock )
         {
             synchronized( receiver.lock )
             {
                 connector.connect( address, receiver );
-                
+
                 sender.lock.wait();
                 receiver.lock.wait();
             }
         }
-        
+
         acceptor.unbind( address );
-        
+
         assertEquals( stream.bytesRead, receiver.bytesRead );
         assertEquals( stream.size, receiver.bytesRead );
         byte[] expectedMd5 = stream.digest.digest();
@@ -423,12 +424,13 @@
         Random random = new Random();
         MessageDigest digest;
 
-        public FixedRandomInputStream( long size ) throws Exception
+        FixedRandomInputStream( long size ) throws Exception
         {
             this.size = size;
             digest = MessageDigest.getInstance( "MD5" );
         }
 
+        @Override
         public int read() throws IOException
         {
             if ( isAllWritten() )
@@ -438,7 +440,7 @@
             digest.update( b );
             return b;
         }
-        
+
         public long getBytesRead()
         {
             return bytesRead;
@@ -457,24 +459,27 @@
 
     private static class SenderHandler extends IoHandlerAdapter
     {
-        Object lock = new Object();
+        final Object lock = new Object();
         InputStream inputStream;
         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
 
-        public SenderHandler( InputStream inputStream )
+        SenderHandler( InputStream inputStream )
         {
             this.inputStream = inputStream;
         }
 
+        @Override
         public void sessionCreated( IoSession session ) throws Exception {
             super.sessionCreated( session );
             session.getFilterChain().addLast( "codec", streamWriteFilter );
         }
 
+        @Override
         public void sessionOpened( IoSession session ) throws Exception {
             session.write( inputStream );
         }
 
+        @Override
         public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
         {
             synchronized( lock )
@@ -483,6 +488,7 @@
             }
         }
 
+        @Override
         public void sessionClosed( IoSession session ) throws Exception
         {
             synchronized( lock )
@@ -491,6 +497,7 @@
             }
         }
 
+        @Override
         public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
         {
             synchronized( lock )
@@ -499,6 +506,7 @@
             }
         }
 
+        @Override
         public void messageSent( IoSession session, Object message ) throws Exception
         {
             if( message == inputStream )
@@ -513,29 +521,32 @@
 
     private static class ReceiverHandler extends IoHandlerAdapter
     {
-        Object lock = new Object();
+        final Object lock = new Object();
         long bytesRead = 0;
         long size = 0;
         MessageDigest digest;
 
-        public ReceiverHandler( long size ) throws Exception
+        ReceiverHandler( long size ) throws Exception
         {
             this.size = size;
             digest = MessageDigest.getInstance( "MD5" );
         }
 
+        @Override
         public void sessionCreated( IoSession session ) throws Exception
         {
             super.sessionCreated(session);
-            
+
             session.setIdleTime( IdleStatus.READER_IDLE, 5 );
         }
 
+        @Override
         public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
         {
             session.close();
         }
 
+        @Override
         public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
         {
             synchronized( lock )
@@ -543,7 +554,8 @@
                 lock.notifyAll();
             }
         }
-        
+
+        @Override
         public void sessionClosed( IoSession session ) throws Exception
         {
             synchronized( lock )
@@ -552,6 +564,7 @@
             }
         }
 
+        @Override
         public void messageReceived( IoSession session, Object message ) throws Exception
         {
             ByteBuffer buf = ( ByteBuffer ) message;
@@ -566,27 +579,28 @@
             }
         }
     }
-    
+
     public static class WriteRequestMatcher extends AbstractMatcher
     {
+        @Override
         protected boolean argumentMatches( Object expected, Object actual )
         {
-            if( expected instanceof WriteRequest && expected instanceof WriteRequest )
+            if( expected instanceof WriteRequest && actual instanceof WriteRequest )
             {
                 WriteRequest w1 = ( WriteRequest ) expected;
                 WriteRequest w2 = ( WriteRequest ) actual;
-                
-                return w1.getMessage().equals( w2.getMessage() ) 
+
+                return w1.getMessage().equals( w2.getMessage() )
                     && w1.getFuture().isWritten() == w2.getFuture().isWritten();
             }
             return super.argumentMatches( expected, actual );
         }
     }
-    
+
     private static class DummyWriteFuture implements WriteFuture
     {
         private boolean written;
-        
+
         public boolean isWritten()
         {
             return written;
@@ -596,7 +610,7 @@
         {
             this.written = written;
         }
-        
+
         public IoSession getSession()
         {
             return null;

Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Sun Nov  5 11:36:41 2006
@@ -6,26 +6,27 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.codec.textline;
 
 import java.net.SocketAddress;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
+import java.util.LinkedList;
+import java.util.Queue;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
-
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
@@ -36,7 +37,6 @@
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.util.Queue;
 
 /**
  * Tests {@link TextLineDecoder}.
@@ -56,19 +56,19 @@
         TextLineDecoder decoder =
             new TextLineDecoder(
                     Charset.forName( "UTF-8" ), LineDelimiter.WINDOWS );
-        
+
         CharsetEncoder encoder = Charset.forName( "UTF-8" ).newEncoder();
         IoSession session = new DummySession();
         TestDecoderOutput out = new TestDecoderOutput();
         ByteBuffer in = ByteBuffer.allocate( 16 );
-     
+
         // Test one decode and one output
         in.putString( "ABC\r\n", encoder );
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "ABC", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "ABC", out.getMessageQueue().poll() );
+
         // Test two decode and one output
         in.clear();
         in.putString( "DEF", encoder );
@@ -80,17 +80,17 @@
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "DEFGHI", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "DEFGHI", out.getMessageQueue().poll() );
+
         // Test one decode and two output
         in.clear();
         in.putString( "JKL\r\nMNO\r\n", encoder );
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 2, out.getMessageQueue().size() );
-        Assert.assertEquals( "JKL", out.getMessageQueue().pop() );
-        Assert.assertEquals( "MNO", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "JKL", out.getMessageQueue().poll() );
+        Assert.assertEquals( "MNO", out.getMessageQueue().poll() );
+
         // Test splitted long delimiter
         decoder = new TextLineDecoder(
                 Charset.forName( "UTF-8" ),
@@ -110,27 +110,27 @@
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "PQR", out.getMessageQueue().pop() );
+        Assert.assertEquals( "PQR", out.getMessageQueue().poll() );
     }
-    
+
     public void testAutoDecode() throws Exception
     {
         TextLineDecoder decoder =
             new TextLineDecoder(
                     Charset.forName( "UTF-8" ), LineDelimiter.AUTO );
-        
+
         CharsetEncoder encoder = Charset.forName( "UTF-8" ).newEncoder();
         IoSession session = new DummySession();
         TestDecoderOutput out = new TestDecoderOutput();
         ByteBuffer in = ByteBuffer.allocate( 16 );
-     
+
         // Test one decode and one output
         in.putString( "ABC\r\n", encoder );
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "ABC", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "ABC", out.getMessageQueue().poll() );
+
         // Test two decode and one output
         in.clear();
         in.putString( "DEF", encoder );
@@ -142,27 +142,27 @@
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "DEFGHI", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "DEFGHI", out.getMessageQueue().poll() );
+
         // Test one decode and two output
         in.clear();
         in.putString( "JKL\r\nMNO\r\n", encoder );
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 2, out.getMessageQueue().size() );
-        Assert.assertEquals( "JKL", out.getMessageQueue().pop() );
-        Assert.assertEquals( "MNO", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "JKL", out.getMessageQueue().poll() );
+        Assert.assertEquals( "MNO", out.getMessageQueue().poll() );
+
         // Test multiple '\n's
         in.clear();
         in.putString( "\n\n\n", encoder );
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 3, out.getMessageQueue().size() );
-        Assert.assertEquals( "", out.getMessageQueue().pop() );
-        Assert.assertEquals( "", out.getMessageQueue().pop() );
-        Assert.assertEquals( "", out.getMessageQueue().pop() );
-        
+        Assert.assertEquals( "", out.getMessageQueue().poll() );
+        Assert.assertEquals( "", out.getMessageQueue().poll() );
+        Assert.assertEquals( "", out.getMessageQueue().poll() );
+
         // Test splitted long delimiter (\r\r\n)
         in.clear();
         in.putString( "PQR\r", encoder );
@@ -179,11 +179,12 @@
         in.flip();
         decoder.decode( session, in, out );
         Assert.assertEquals( 1, out.getMessageQueue().size() );
-        Assert.assertEquals( "PQR", out.getMessageQueue().pop() );
+        Assert.assertEquals( "PQR", out.getMessageQueue().poll() );
     }
-    
+
     private static class DummySession extends BaseIoSession
     {
+        @Override
         protected void updateTrafficMask()
         {
         }
@@ -192,7 +193,7 @@
         {
             return null;
         }
-        
+
         public IoServiceConfig getServiceConfig()
         {
             return null;
@@ -243,17 +244,17 @@
             return 0;
         }
     }
-    
+
     private static class TestDecoderOutput implements ProtocolDecoderOutput
     {
-        private Queue messageQueue = new Queue();
+        private Queue<Object> messageQueue = new LinkedList<Object>( );
 
         public void write( Object message )
         {
-            messageQueue.push( message );
+            messageQueue.add( message );
         }
-        
-        public Queue getMessageQueue()
+
+        public Queue<Object> getMessageQueue()
         {
             return messageQueue;
         }

Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.codec.textline;
 
@@ -58,15 +58,16 @@
         SimpleProtocolEncoderOutput out =
             new SimpleProtocolEncoderOutput()
             {
+                @Override
                 protected WriteFuture doFlush( ByteBuffer buf )
                 {
                     return null;
                 }
             };
-        
+
         encoder.encode( session, "ABC", out );
         Assert.assertEquals( 1, out.getBufferQueue().size() );
-        ByteBuffer buf = ( ByteBuffer ) out.getBufferQueue().pop();
+        ByteBuffer buf = out.getBufferQueue().remove(0);
         Assert.assertEquals( 5, buf.remaining() );
         Assert.assertEquals( 'A', buf.get() );
         Assert.assertEquals( 'B', buf.get() );
@@ -77,6 +78,7 @@
 
     private static class DummySession extends BaseIoSession
     {
+        @Override
         protected void updateTrafficMask()
         {
         }

Modified: directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java (original)
+++ directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java Sun Nov  5 11:36:41 2006
@@ -36,7 +36,7 @@
 import org.apache.mina.filter.codec.demux.MessageDecoderResult;
 
 /**
- * A {@link MessageDecoder} that decodes {@link HttpRequest}.
+ * A {@link org.apache.mina.filter.codec.demux.MessageDecoder} that decodes {@link HttpRequest}.
  *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$

Modified: directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java (original)
+++ directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java Sun Nov  5 11:36:41 2006
@@ -6,21 +6,22 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.support;
 
 import java.nio.ByteBuffer;
-
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
@@ -28,13 +29,12 @@
 import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLSession;
 
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.IoFilter.NextFilter;
 import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.support.DefaultWriteFuture;
 import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.util.Queue;
 import org.apache.mina.util.SessionLog;
 
 /**
@@ -53,7 +53,7 @@
     private final SSLFilter parent;
     private final SSLContext ctx;
     private final IoSession session;
-    private final Queue scheduledWrites = new Queue();
+    private final Queue<ScheduledWrite> scheduledWrites = new ConcurrentLinkedQueue<ScheduledWrite>( );
 
     private SSLEngine sslEngine;
 
@@ -88,12 +88,12 @@
     private boolean initialHandshakeComplete;
 
     private boolean writingEncryptedData;
-    
+
     /**
      * Constuctor.
      *
      * @param sslc
-     * @throws SSLException 
+     * @throws SSLException
      */
     public SSLHandler( SSLFilter parent, SSLContext sslc, IoSession session ) throws SSLException
     {
@@ -122,21 +122,21 @@
         {
             sslEngine.setNeedClientAuth( true );
         }
-  
+
         if( parent.getEnabledCipherSuites() != null )
         {
             sslEngine.setEnabledCipherSuites( parent.getEnabledCipherSuites() );
         }
-        
+
         if( parent.getEnabledProtocols() != null )
         {
             sslEngine.setEnabledProtocols( parent.getEnabledProtocols() );
         }
 
-        sslEngine.beginHandshake();   
+        sslEngine.beginHandshake();
         initialHandshakeStatus = sslEngine.getHandshakeStatus();//SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
         initialHandshakeComplete = false;
-        
+
         SSLByteBufferPool.initiate( sslEngine );
 
         appBuffer = SSLByteBufferPool.getApplicationBuffer();
@@ -145,10 +145,10 @@
         outNetBuffer = SSLByteBufferPool.getPacketBuffer();
         outNetBuffer.position( 0 );
         outNetBuffer.limit( 0 );
-        
+
         writingEncryptedData = false;
     }
-    
+
     /**
      * Release allocated ByteBuffers.
      */
@@ -171,7 +171,7 @@
                     "Unexpected exception from SSLEngine.closeInbound().",
                     e );
         }
-        
+
         try
         {
             do
@@ -189,7 +189,7 @@
         }
         sslEngine.closeOutbound();
         sslEngine = null;
-        
+
         SSLByteBufferPool.release( appBuffer );
         SSLByteBufferPool.release( inNetBuffer );
         SSLByteBufferPool.release( outNetBuffer );
@@ -200,7 +200,7 @@
     {
         return parent;
     }
-    
+
     public IoSession getSession()
     {
         return session;
@@ -239,17 +239,17 @@
     {
         return ( initialHandshakeStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP && !isInboundDone() );
     }
-    
+
     public void scheduleWrite( NextFilter nextFilter, WriteRequest writeRequest )
     {
-        scheduledWrites.push( new ScheduledWrite( nextFilter, writeRequest ) );
+        scheduledWrites.add( new ScheduledWrite( nextFilter, writeRequest ) );
     }
-    
+
     public void flushScheduledWrites() throws SSLException
     {
         ScheduledWrite scheduledWrite;
-        
-        while( ( scheduledWrite = ( ScheduledWrite ) scheduledWrites.pop() ) != null )
+
+        while( ( scheduledWrite = scheduledWrites.poll() ) != null )
         {
             if( SessionLog.isDebugEnabled( session ) )
             {
@@ -279,9 +279,9 @@
             appBuffer.limit( 0 );
             if( SessionLog.isDebugEnabled( session ) )
             {
-                SessionLog.debug( session, 
+                SessionLog.debug( session,
                                     " expanded inNetBuffer:" + inNetBuffer );
-                SessionLog.debug( session, 
+                SessionLog.debug( session,
                                     " expanded appBuffer:" + appBuffer );
             }
         }
@@ -342,8 +342,6 @@
         // buffer.
         outNetBuffer.clear();
 
-        SSLEngineResult result;
-
         // Loop until there is no more data in src
         while ( src.hasRemaining() ) {
 
@@ -357,7 +355,7 @@
                 }
             }
 
-            result = sslEngine.wrap( src, outNetBuffer );
+            SSLEngineResult result = sslEngine.wrap( src, outNetBuffer );
             if ( SessionLog.isDebugEnabled( session ) ) {
                 SessionLog.debug( session, " Wrap res:" + result );
             }
@@ -378,7 +376,7 @@
 
     /**
      * Start SSL shutdown process.
-     * 
+     *
      * @return <tt>true</tt> if shutdown process is started.
      *         <tt>false</tt> if shutdown process is already finished.
      *
@@ -390,7 +388,7 @@
         {
             return false;
         }
-        
+
         sslEngine.closeOutbound();
 
         // By RFC 2616, we can "fire and forget" our close_notify
@@ -444,10 +442,10 @@
                                     status +
                                     " inNetBuffer: " + inNetBuffer + "appBuffer: " + appBuffer);
         }
-        
+
         return status;
     }
-    
+
     /**
      * Perform any handshaking processing.
      */
@@ -457,7 +455,7 @@
         {
             SessionLog.debug( session, " doHandshake()" );
         }
-        
+
         while( !initialHandshakeComplete )
         {
             if( initialHandshakeStatus == SSLEngineResult.HandshakeStatus.FINISHED )
@@ -492,7 +490,7 @@
                     SessionLog.debug( session, "  initialHandshakeStatus=NEED_UNWRAP" );
                 }
                 SSLEngineResult.Status status = unwrapHandshake();
-                if( ( initialHandshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED 
+                if( ( initialHandshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
                         && status == SSLEngineResult.Status.BUFFER_UNDERFLOW )
                         || isInboundDone() )
                 {
@@ -543,15 +541,14 @@
             // no; bail out
             return DefaultWriteFuture.newNotWrittenFuture( session );
         }
-        
-        WriteFuture writeFuture = null;
-        
-        // write net data
-        
+
         // set flag that we are writing encrypted data
         // (used in SSLFilter.filterWrite())
         writingEncryptedData = true;
-        
+
+        // write net data
+        WriteFuture writeFuture = null;
+
         try
         {
             if( SessionLog.isDebugEnabled( session ) )
@@ -564,7 +561,7 @@
                 SessionLog.debug( session, " session write: " + writeBuffer );
             }
             //debug("outNetBuffer (after copy): {0}", sslHandler.getOutNetBuffer());
-            
+
             writeFuture = new DefaultWriteFuture( session );
             parent.filterWrite( nextFilter, session, new WriteRequest( writeBuffer, writeFuture ) );
 
@@ -598,18 +595,10 @@
         {
             writingEncryptedData = false;
         }
-        
-        if( writeFuture != null )
-        {
-            return writeFuture;
-        }
-        else
-        {
-            return DefaultWriteFuture.newNotWrittenFuture( session );
-        }
+
+        return writeFuture;
     }
-    
-    
+
     private SSLEngineResult.Status unwrap() throws SSLException
     {
         if( SessionLog.isDebugEnabled( session ) )
@@ -685,7 +674,7 @@
                res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP );
 
         initialHandshakeStatus = res.getHandshakeStatus();
-    
+
         // If handshake finished, no data was produced, and the status is still ok,
         // try to unwrap more
         if (initialHandshakeStatus == SSLEngineResult.HandshakeStatus.FINISHED
@@ -758,14 +747,14 @@
     {
         private final NextFilter nextFilter;
         private final WriteRequest writeRequest;
-        
-        public ScheduledWrite( NextFilter nextFilter, WriteRequest writeRequest )
+
+        ScheduledWrite( NextFilter nextFilter, WriteRequest writeRequest )
         {
             this.nextFilter = nextFilter;
             this.writeRequest = writeRequest;
         }
     }
-    
+
     /**
      * Creates a new Mina byte buffer that is a deep copy of the remaining bytes
      * in the given buffer (between index buf.position() and buf.limit())