You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2006/11/20 08:06:40 UTC
svn commit: r477082 - in
/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio:
SocketAcceptor.java SocketConnector.java SocketIoProcessor.java
support/DatagramAcceptorDelegate.java support/DatagramConnectorDelegate.java
Author: trustin
Date: Sun Nov 19 23:06:39 2006
New Revision: 477082
URL: http://svn.apache.org/viewvc?view=rev&rev=477082
Log:
Replaced LinkedList with ConcurrentLinkedQueue where can get benefit from it.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Nov 19 23:06:39 2006
@@ -28,9 +28,9 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ExceptionMonitor;
@@ -66,8 +66,8 @@
private final String threadName = "SocketAcceptor-" + id;
private ServerSocketChannel serverSocketChannel;
- private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
- private final Queue<CancellationRequest> cancelQueue = new LinkedList<CancellationRequest>();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
private final SocketIoProcessor[] ioProcessors;
private final int processorCount;
@@ -195,10 +195,7 @@
{
RegistrationRequest request = new RegistrationRequest();
- synchronized( registerQueue )
- {
- registerQueue.offer( request );
- }
+ registerQueue.offer( request );
startupWorker();
@@ -276,11 +273,7 @@
throw new IllegalArgumentException( "Address not bound: " + getLocalAddress() );
}
- synchronized( cancelQueue )
- {
- cancelQueue.offer( request );
- }
-
+ cancelQueue.offer( request );
selector.wakeup();
synchronized( request )
@@ -424,20 +417,9 @@
private void registerNew()
{
- if( registerQueue.isEmpty() )
- {
- return;
- }
-
for( ; ; )
{
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = registerQueue.poll();
- }
-
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
{
break;
@@ -495,20 +477,9 @@
private void cancelKeys()
{
- if( cancelQueue.isEmpty() )
- {
- return;
- }
-
for( ; ; )
{
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = cancelQueue.poll();
- }
-
+ CancellationRequest request = cancelQueue.poll();
if( request == null )
{
break;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Nov 19 23:06:39 2006
@@ -27,9 +27,9 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ConnectFuture;
@@ -61,7 +61,7 @@
private final Object lock = new Object();
private final int id = nextId++;
private final String threadName = "SocketConnector-" + id;
- private final Queue<ConnectionRequest> connectQueue = new LinkedList<ConnectionRequest>();
+ private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
private final SocketIoProcessor[] ioProcessors;
private final int processorCount;
private final Executor executor;
@@ -205,10 +205,7 @@
}
}
- synchronized( connectQueue )
- {
- connectQueue.offer( request );
- }
+ connectQueue.offer( request );
selector.wakeup();
return request;
@@ -226,17 +223,9 @@
private void registerNew()
{
- if( connectQueue.isEmpty() )
- return;
-
for( ; ; )
{
- ConnectionRequest req;
- synchronized( connectQueue )
- {
- req = connectQueue.poll();
- }
-
+ ConnectionRequest req = connectQueue.poll();
if( req == null )
break;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Nov 19 23:06:39 2006
@@ -24,9 +24,9 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ByteBuffer;
@@ -56,10 +56,10 @@
*/
private Selector selector;
- private final Queue<SocketSessionImpl> newSessions = new LinkedList<SocketSessionImpl>();
- private final Queue<SocketSessionImpl> removingSessions = new LinkedList<SocketSessionImpl>();
- private final Queue<SocketSessionImpl> flushingSessions = new LinkedList<SocketSessionImpl>();
- private final Queue<SocketSessionImpl> trafficControllingSessions = new LinkedList<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
private Worker worker;
private long lastIdleCheckTime = System.currentTimeMillis();
@@ -72,10 +72,7 @@
void addNew( SocketSessionImpl session ) throws IOException
{
- synchronized( newSessions )
- {
- newSessions.offer( session );
- }
+ newSessions.offer( session );
startupWorker();
@@ -124,41 +121,24 @@
private void scheduleRemove( SocketSessionImpl session )
{
- synchronized( removingSessions )
- {
- removingSessions.offer( session );
- }
+ removingSessions.offer( session );
}
private void scheduleFlush( SocketSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.offer( session );
- }
+ flushingSessions.offer( session );
}
private void scheduleTrafficControl( SocketSessionImpl session )
{
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.offer( session );
- }
+ trafficControllingSessions.offer( session );
}
private void doAddNew()
{
- if( newSessions.isEmpty() )
- return;
-
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( newSessions )
- {
- session = newSessions.poll();
- }
+ SocketSessionImpl session = newSessions.poll();
if( session == null )
break;
@@ -199,17 +179,9 @@
private void doRemove()
{
- if( removingSessions.isEmpty() )
- return;
-
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( removingSessions )
- {
- session = removingSessions.poll();
- }
+ SocketSessionImpl session = removingSessions.poll();
if( session == null )
break;
@@ -393,12 +365,7 @@
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = flushingSessions.poll();
- }
+ SocketSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -511,17 +478,11 @@
private void doUpdateTrafficMask()
{
- if( trafficControllingSessions.isEmpty() )
- return;
-
for( ; ; )
{
SocketSessionImpl session;
- synchronized( trafficControllingSessions )
- {
- session = trafficControllingSessions.poll();
- }
+ session = trafficControllingSessions.poll();
if( session == null )
break;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Sun Nov 19 23:06:39 2006
@@ -26,9 +26,9 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ByteBuffer;
@@ -64,9 +64,9 @@
private final int id = nextId ++ ;
private Selector selector;
private DatagramChannel channel;
- private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
- private final Queue<CancellationRequest> cancelQueue = new LinkedList<CancellationRequest>();
- private final Queue<DatagramSessionImpl> flushingSessions = new LinkedList<DatagramSessionImpl>();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
private Worker worker;
/**
@@ -88,10 +88,7 @@
RegistrationRequest request = new RegistrationRequest();
synchronized( this )
{
- synchronized( registerQueue )
- {
- registerQueue.offer( request );
- }
+ registerQueue.offer( request );
startupWorker();
}
selector.wakeup();
@@ -138,10 +135,7 @@
throw new IllegalArgumentException( "Address not bound: " + getLocalAddress() );
}
- synchronized( cancelQueue )
- {
- cancelQueue.offer( request );
- }
+ cancelQueue.offer( request );
}
selector.wakeup();
@@ -287,10 +281,7 @@
private void scheduleFlush( DatagramSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.offer( session );
- }
+ flushingSessions.offer( session );
}
private class Worker implements Runnable
@@ -421,18 +412,9 @@
private void flushSessions()
{
- if( flushingSessions.size() == 0 )
- return;
-
for( ;; )
{
- DatagramSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = flushingSessions.poll();
- }
-
+ DatagramSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -529,12 +511,7 @@
for( ;; )
{
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = registerQueue.poll();
- }
-
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
break;
@@ -590,17 +567,9 @@
private void cancelKeys()
{
- if( cancelQueue.isEmpty() )
- return;
-
for( ;; )
{
- CancellationRequest request;
- synchronized( cancelQueue )
- {
- request = cancelQueue.poll();
- }
-
+ CancellationRequest request = cancelQueue.poll();
if( request == null )
{
break;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=477082&r1=477081&r2=477082
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Sun Nov 19 23:06:39 2006
@@ -26,9 +26,9 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ByteBuffer;
@@ -65,10 +65,10 @@
private final Executor executor;
private final int id = nextId ++ ;
private Selector selector;
- private final Queue<RegistrationRequest> registerQueue = new LinkedList<RegistrationRequest>();
- private final Queue<DatagramSessionImpl> cancelQueue = new LinkedList<DatagramSessionImpl>();
- private final Queue<DatagramSessionImpl> flushingSessions = new LinkedList<DatagramSessionImpl>();
- private final Queue<DatagramSessionImpl> trafficControllingSessions = new LinkedList<DatagramSessionImpl>();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+ private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
private Worker worker;
/**
@@ -154,10 +154,7 @@
return DefaultConnectFuture.newFailedFuture( e );
}
- synchronized( registerQueue )
- {
- registerQueue.offer( request );
- }
+ registerQueue.offer( request );
}
selector.wakeup();
@@ -212,10 +209,7 @@
return;
}
- synchronized( cancelQueue )
- {
- cancelQueue.offer( session );
- }
+ cancelQueue.offer( session );
}
selector.wakeup();
@@ -233,10 +227,7 @@
private void scheduleFlush( DatagramSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.offer( session );
- }
+ flushingSessions.offer( session );
}
public void updateTrafficMask( DatagramSessionImpl session )
@@ -252,26 +243,14 @@
private void scheduleTrafficControl( DatagramSessionImpl session )
{
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.offer( session );
- }
+ trafficControllingSessions.offer( session );
}
private void doUpdateTrafficMask()
{
- if( trafficControllingSessions.isEmpty() )
- return;
-
for( ;; )
{
- DatagramSessionImpl session;
-
- synchronized( trafficControllingSessions )
- {
- session = trafficControllingSessions.poll();
- }
-
+ DatagramSessionImpl session = trafficControllingSessions.poll();
if( session == null )
break;
@@ -456,18 +435,9 @@
private void flushSessions()
{
- if( flushingSessions.size() == 0 )
- return;
-
for( ;; )
{
- DatagramSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = flushingSessions.poll();
- }
-
+ DatagramSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -553,17 +523,9 @@
private void registerNew()
{
- if( registerQueue.isEmpty() )
- return;
-
for( ;; )
{
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = registerQueue.poll();
- }
-
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
break;
@@ -625,17 +587,9 @@
private void cancelKeys()
{
- if( cancelQueue.isEmpty() )
- return;
-
for( ;; )
{
- DatagramSessionImpl session;
- synchronized( cancelQueue )
- {
- session = cancelQueue.poll();
- }
-
+ DatagramSessionImpl session = cancelQueue.poll();
if( session == null )
break;
else