You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2011/02/15 14:57:30 UTC

svn commit: r1070902 - in /mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling: AbstractPollingIoAcceptor.java AbstractPollingIoConnector.java AbstractPollingIoProcessor.java

Author: elecharny
Date: Tue Feb 15 13:57:29 2011
New Revision: 1070902

URL: http://svn.apache.org/viewvc?rev=1070902&view=rev
Log:
Applied patch from DIRMINA-819

Modified:
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java Tue Feb 15 13:57:29 2011
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.mina.core.RuntimeIoException;
 import org.apache.mina.core.filterchain.IoFilter;
@@ -70,8 +71,6 @@ public abstract class AbstractPollingIoA
 
     private final boolean createdProcessor;
 
-    private final Object lock = new Object();
-
     private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
 
     private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
@@ -85,7 +84,7 @@ public abstract class AbstractPollingIoA
     private volatile boolean selectable;
 
     /** The thread responsible of accepting incoming requests */ 
-    private Acceptor acceptor;
+    private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
 
     /**
      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
@@ -355,9 +354,12 @@ public abstract class AbstractPollingIoA
         }
 
         // start the acceptor if not already started
-        synchronized (lock) {
-            if (acceptor == null) {
-                acceptor = new Acceptor();
+        Acceptor acceptor = acceptorRef.get();
+
+        if (acceptor == null) {
+            acceptor = new Acceptor();
+
+            if (acceptorRef.compareAndSet(null, acceptor)) {
                 executeWorker(acceptor);
             }
         }
@@ -390,6 +392,8 @@ public abstract class AbstractPollingIoA
      */
     private class Acceptor implements Runnable {
         public void run() {
+            assert (acceptorRef.get() == this);
+
             int nHandles = 0;
 
             while (selectable) {
@@ -418,13 +422,19 @@ public abstract class AbstractPollingIoA
                     // quit the loop: we don't have any socket listening
                     // for incoming connection.
                     if (nHandles == 0) {
-                        synchronized (lock) {
-                            if (registerQueue.isEmpty()
-                                    && cancelQueue.isEmpty()) {
-                                acceptor = null;
-                                break;
-                            }
+                        acceptorRef.set(null);
+
+                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
+                            assert (acceptorRef.get() != this);
+                            break;
+                        }
+                        
+                        if (!acceptorRef.compareAndSet(null, this)) {
+                            assert (acceptorRef.get() != this);
+                            break;
                         }
+                        
+                        assert (acceptorRef.get() == this);
                     }
                 } catch (ClosedSelectorException cse) {
                     // If the selector has been closed, we can exit the loop

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java Tue Feb 15 13:57:29 2011
@@ -27,6 +27,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.mina.core.RuntimeIoException;
 import org.apache.mina.core.filterchain.IoFilter;
@@ -63,7 +64,6 @@ import org.apache.mina.util.ExceptionMon
 public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
         extends AbstractIoConnector {
 
-    private final Object lock = new Object();
     private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
     private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
     private final IoProcessor<T> processor;
@@ -74,7 +74,7 @@ public abstract class AbstractPollingIoC
     private volatile boolean selectable;
     
     /** The connector thread */
-    private Connector connector;
+    private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
 
     /**
      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
@@ -353,9 +353,12 @@ public abstract class AbstractPollingIoC
             cancelQueue.clear();
         }
 
-        synchronized (lock) {
-            if (connector == null) {
-                connector = new Connector();
+        Connector connector = connectorRef.get();
+        
+        if (connector == null) {
+            connector = new Connector();
+            
+            if (connectorRef.compareAndSet(null, connector)) {
                 executeWorker(connector);
             }
         }
@@ -463,7 +466,10 @@ public abstract class AbstractPollingIoC
     private class Connector implements Runnable {
 
         public void run() {
+            assert (connectorRef.get() == this);
+            
             int nHandles = 0;
+            
             while (selectable) {
                 try {
                     // the timeout for select shall be smaller of the connect
@@ -482,12 +488,19 @@ public abstract class AbstractPollingIoC
                     nHandles -= cancelKeys();
 
                     if (nHandles == 0) {
-                        synchronized (lock) {
-                            if (connectQueue.isEmpty()) {
-                                connector = null;
-                                break;
-                            }
+                        connectorRef.set(null);
+
+                        if (connectQueue.isEmpty()) {
+                            assert (connectorRef.get() != this);
+                            break;
+                        }
+                        
+                        if (!connectorRef.compareAndSet(null, this)) {
+                            assert (connectorRef.get() != this);
+                            break;
                         }
+                        
+                        assert (connectorRef.get() == this);
                     }
                 } catch (ClosedSelectorException cse) {
                     // If the selector has been closed, we can exit the loop

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Tue Feb 15 13:57:29 2011
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
@@ -85,9 +86,6 @@ public abstract class AbstractPollingIoP
     /** A map containing the last Thread ID for each class */
     private static final Map<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
 
-    /** A lock used to protect the processor creation */
-    private final Object lock = new Object();
-
     /** This IoProcessor instance name */
     private final String threadName;
 
@@ -110,7 +108,7 @@ public abstract class AbstractPollingIoP
     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
 
     /** The processor thread : it handles the incoming messages */
-    private Processor processor;
+    private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
 
     private long lastIdleCheckTime;
 
@@ -456,9 +454,12 @@ public abstract class AbstractPollingIoP
      * pool. The Runnable will be renamed
      */
     private void startupProcessor() {
-        synchronized (lock) {
-            if (processor == null) {
-                processor = new Processor();
+        Processor processor = processorRef.get();
+
+        if (processor == null) {
+            processor = new Processor();
+
+            if (processorRef.compareAndSet(null, processor)) {
                 executor.execute(new NamePreservingRunnable(processor, threadName));
             }
         }
@@ -1077,6 +1078,8 @@ public abstract class AbstractPollingIoP
      */
     private class Processor implements Runnable {
         public void run() {
+            assert (processorRef.get() == this);
+
             int nSessions = 0;
             lastIdleCheckTime = System.currentTimeMillis();
 
@@ -1151,12 +1154,23 @@ public abstract class AbstractPollingIoP
                     // Get a chance to exit the infinite loop if there are no
                     // more sessions on this Processor
                     if (nSessions == 0) {
-                        synchronized (lock) {
-                            if (newSessions.isEmpty() && isSelectorEmpty()) {
-                                processor = null;
-                                break;
-                            }
+                        processorRef.set(null);
+                        
+                        if (newSessions.isEmpty() && isSelectorEmpty()) {
+                            // newSessions.add() precedes startupProcessor
+                            assert (processorRef.get() != this);
+                            break;
+                        }
+                        
+                        assert (processorRef.get() != this);
+                        
+                        if (!processorRef.compareAndSet(null, this)) {
+                            // startupProcessor won race, so must exit processor
+                            assert (processorRef.get() != this);
+                            break;
                         }
+                        
+                        assert (processorRef.get() == this);
                     }
 
                     // Disconnect all sessions immediately if disposal has been