You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2006/11/20 08:06:40 UTC

svn commit: r477082 - in /mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio: SocketAcceptor.java SocketConnector.java SocketIoProcessor.java support/DatagramAcceptorDelegate.java support/DatagramConnectorDelegate.java

Author: trustin
Date: Sun Nov 19 23:06:39 2006
New Revision: 477082

URL: http://svn.apache.org/viewvc?view=rev&rev=477082
Log:
Replaced LinkedList with ConcurrentLinkedQueue where can get benefit from it.

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Nov 19 23:06:39 2006
@@ -28,9 +28,9 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ExceptionMonitor;
@@ -66,8 +66,8 @@
     private final String threadName = "SocketAcceptor-" + id;
 
     private ServerSocketChannel serverSocketChannel;
-    private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
-    private final Queue<CancellationRequest> cancelQueue = new LinkedList<CancellationRequest>();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
 
     private final SocketIoProcessor[] ioProcessors;
     private final int processorCount;
@@ -195,10 +195,7 @@
     {
         RegistrationRequest request = new RegistrationRequest();
 
-        synchronized( registerQueue )
-        {
-            registerQueue.offer( request );
-        }
+        registerQueue.offer( request );
 
         startupWorker();
 
@@ -276,11 +273,7 @@
             throw new IllegalArgumentException( "Address not bound: " + getLocalAddress() );
         }
 
-        synchronized( cancelQueue )
-        {
-            cancelQueue.offer( request );
-        }
-
+        cancelQueue.offer( request );
         selector.wakeup();
 
         synchronized( request )
@@ -424,20 +417,9 @@
 
     private void registerNew()
     {
-        if( registerQueue.isEmpty() )
-        {
-            return;
-        }
-
         for( ; ; )
         {
-            RegistrationRequest req;
-
-            synchronized( registerQueue )
-            {
-                req = registerQueue.poll();
-            }
-
+            RegistrationRequest req = registerQueue.poll();
             if( req == null )
             {
                 break;
@@ -495,20 +477,9 @@
 
     private void cancelKeys()
     {
-        if( cancelQueue.isEmpty() )
-        {
-            return;
-        }
-
         for( ; ; )
         {
-            CancellationRequest request;
-
-            synchronized( cancelQueue )
-            {
-                request = cancelQueue.poll();
-            }
-
+            CancellationRequest request = cancelQueue.poll();
             if( request == null )
             {
                 break;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Nov 19 23:06:39 2006
@@ -27,9 +27,9 @@
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ConnectFuture;
@@ -61,7 +61,7 @@
     private final Object lock = new Object();
     private final int id = nextId++;
     private final String threadName = "SocketConnector-" + id;
-    private final Queue<ConnectionRequest> connectQueue = new LinkedList<ConnectionRequest>();
+    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
     private final SocketIoProcessor[] ioProcessors;
     private final int processorCount;
     private final Executor executor;
@@ -205,10 +205,7 @@
             }
         }
 
-        synchronized( connectQueue )
-        {
-            connectQueue.offer( request );
-        }
+        connectQueue.offer( request );
         selector.wakeup();
 
         return request;
@@ -226,17 +223,9 @@
 
     private void registerNew()
     {
-        if( connectQueue.isEmpty() )
-            return;
-
         for( ; ; )
         {
-            ConnectionRequest req;
-            synchronized( connectQueue )
-            {
-                req = connectQueue.poll();
-            }
-
+            ConnectionRequest req = connectQueue.poll();
             if( req == null )
                 break;
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Nov 19 23:06:39 2006
@@ -24,9 +24,9 @@
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ByteBuffer;
@@ -56,10 +56,10 @@
      */
     private Selector selector;
 
-    private final Queue<SocketSessionImpl> newSessions = new LinkedList<SocketSessionImpl>();
-    private final Queue<SocketSessionImpl> removingSessions = new LinkedList<SocketSessionImpl>();
-    private final Queue<SocketSessionImpl> flushingSessions = new LinkedList<SocketSessionImpl>();
-    private final Queue<SocketSessionImpl> trafficControllingSessions = new LinkedList<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
 
     private Worker worker;
     private long lastIdleCheckTime = System.currentTimeMillis();
@@ -72,10 +72,7 @@
 
     void addNew( SocketSessionImpl session ) throws IOException
     {
-        synchronized( newSessions )
-        {
-            newSessions.offer( session );
-        }
+        newSessions.offer( session );
 
         startupWorker();
 
@@ -124,41 +121,24 @@
 
     private void scheduleRemove( SocketSessionImpl session )
     {
-        synchronized( removingSessions )
-        {
-            removingSessions.offer( session );
-        }
+        removingSessions.offer( session );
     }
 
     private void scheduleFlush( SocketSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.offer( session );
-        }
+        flushingSessions.offer( session );
     }
 
     private void scheduleTrafficControl( SocketSessionImpl session )
     {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.offer( session );
-        }
+        trafficControllingSessions.offer( session );
     }
 
     private void doAddNew()
     {
-        if( newSessions.isEmpty() )
-            return;
-
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( newSessions )
-            {
-                session = newSessions.poll();
-            }
+            SocketSessionImpl session = newSessions.poll();
 
             if( session == null )
                 break;
@@ -199,17 +179,9 @@
 
     private void doRemove()
     {
-        if( removingSessions.isEmpty() )
-            return;
-
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( removingSessions )
-            {
-                session = removingSessions.poll();
-            }
+            SocketSessionImpl session = removingSessions.poll();
 
             if( session == null )
                 break;
@@ -393,12 +365,7 @@
 
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = flushingSessions.poll();
-            }
+            SocketSessionImpl session = flushingSessions.poll();
 
             if( session == null )
                 break;
@@ -511,17 +478,11 @@
 
     private void doUpdateTrafficMask()
     {
-        if( trafficControllingSessions.isEmpty() )
-            return;
-
         for( ; ; )
         {
             SocketSessionImpl session;
 
-            synchronized( trafficControllingSessions )
-            {
-                session = trafficControllingSessions.poll();
-            }
+            session = trafficControllingSessions.poll();
 
             if( session == null )
                 break;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Sun Nov 19 23:06:39 2006
@@ -26,9 +26,9 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ByteBuffer;
@@ -64,9 +64,9 @@
     private final int id = nextId ++ ;
     private Selector selector;
     private DatagramChannel channel;
-    private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
-    private final Queue<CancellationRequest> cancelQueue = new LinkedList<CancellationRequest>();
-    private final Queue<DatagramSessionImpl> flushingSessions = new LinkedList<DatagramSessionImpl>();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
     private Worker worker;
     
     /**
@@ -88,10 +88,7 @@
         RegistrationRequest request = new RegistrationRequest();
         synchronized( this )
         {
-            synchronized( registerQueue )
-            {
-                registerQueue.offer( request );
-            }
+            registerQueue.offer( request );
             startupWorker();
         }
         selector.wakeup();
@@ -138,10 +135,7 @@
                 throw new IllegalArgumentException( "Address not bound: " + getLocalAddress() );
             }
 
-            synchronized( cancelQueue )
-            {
-                cancelQueue.offer( request );
-            }
+            cancelQueue.offer( request );
         }
         selector.wakeup();
         
@@ -287,10 +281,7 @@
 
     private void scheduleFlush( DatagramSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.offer( session );
-        }
+        flushingSessions.offer( session );
     }
 
     private class Worker implements Runnable
@@ -421,18 +412,9 @@
 
     private void flushSessions()
     {
-        if( flushingSessions.size() == 0 )
-            return;
-
         for( ;; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = flushingSessions.poll();
-            }
-
+            DatagramSessionImpl session = flushingSessions.poll();
             if( session == null )
                 break;
 
@@ -529,12 +511,7 @@
 
         for( ;; )
         {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = registerQueue.poll();
-            }
-
+            RegistrationRequest req = registerQueue.poll();
             if( req == null )
                 break;
 
@@ -590,17 +567,9 @@
 
     private void cancelKeys()
     {
-        if( cancelQueue.isEmpty() )
-            return;
-
         for( ;; )
         {
-            CancellationRequest request;
-            synchronized( cancelQueue )
-            {
-                request = cancelQueue.poll();
-            }
-            
+            CancellationRequest request = cancelQueue.poll();
             if( request == null )
             {
                 break;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Sun Nov 19 23:06:39 2006
@@ -26,9 +26,9 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ByteBuffer;
@@ -65,10 +65,10 @@
     private final Executor executor;
     private final int id = nextId ++ ;
     private Selector selector;
-    private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
-    private final Queue<DatagramSessionImpl> cancelQueue = new LinkedList<DatagramSessionImpl>();
-    private final Queue<DatagramSessionImpl> flushingSessions = new LinkedList<DatagramSessionImpl>();
-    private final Queue<DatagramSessionImpl> trafficControllingSessions = new LinkedList<DatagramSessionImpl>();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+    private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
     private Worker worker;
 
     /**
@@ -154,10 +154,7 @@
                 return DefaultConnectFuture.newFailedFuture( e );
             }
             
-            synchronized( registerQueue )
-            {
-                registerQueue.offer( request );
-            }
+            registerQueue.offer( request );
         }
 
         selector.wakeup();
@@ -212,10 +209,7 @@
                 return;
             }
 
-            synchronized( cancelQueue )
-            {
-                cancelQueue.offer( session );
-            }
+            cancelQueue.offer( session );
         }
 
         selector.wakeup();
@@ -233,10 +227,7 @@
 
     private void scheduleFlush( DatagramSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.offer( session );
-        }
+        flushingSessions.offer( session );
     }
 
     public void updateTrafficMask( DatagramSessionImpl session )
@@ -252,26 +243,14 @@
     
     private void scheduleTrafficControl( DatagramSessionImpl session )
     {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.offer( session );
-        }
+        trafficControllingSessions.offer( session );
     }
     
     private void doUpdateTrafficMask() 
     {
-        if( trafficControllingSessions.isEmpty() )
-            return;
-
         for( ;; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( trafficControllingSessions )
-            {
-                session = trafficControllingSessions.poll();
-            }
-
+            DatagramSessionImpl session = trafficControllingSessions.poll();
             if( session == null )
                 break;
 
@@ -456,18 +435,9 @@
 
     private void flushSessions()
     {
-        if( flushingSessions.size() == 0 )
-            return;
-
         for( ;; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = flushingSessions.poll();
-            }
-
+            DatagramSessionImpl session = flushingSessions.poll();
             if( session == null )
                 break;
 
@@ -553,17 +523,9 @@
 
     private void registerNew()
     {
-        if( registerQueue.isEmpty() )
-            return;
-
         for( ;; )
         {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = registerQueue.poll();
-            }
-
+            RegistrationRequest req = registerQueue.poll();
             if( req == null )
                 break;
 
@@ -625,17 +587,9 @@
 
     private void cancelKeys()
     {
-        if( cancelQueue.isEmpty() )
-            return;
-
         for( ;; )
         {
-            DatagramSessionImpl session;
-            synchronized( cancelQueue )
-            {
-                session = cancelQueue.poll();
-            }
-
+            DatagramSessionImpl session = cancelQueue.poll();
             if( session == null )
                 break;
             else