You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2011/02/15 14:57:30 UTC
svn commit: r1070902 - in
/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling:
AbstractPollingIoAcceptor.java AbstractPollingIoConnector.java
AbstractPollingIoProcessor.java
Author: elecharny
Date: Tue Feb 15 13:57:29 2011
New Revision: 1070902
URL: http://svn.apache.org/viewvc?rev=1070902&view=rev
Log:
Applied patch from DIRMINA-819
Modified:
mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java Tue Feb 15 13:57:29 2011
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.filterchain.IoFilter;
@@ -70,8 +71,6 @@ public abstract class AbstractPollingIoA
private final boolean createdProcessor;
- private final Object lock = new Object();
-
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
@@ -85,7 +84,7 @@ public abstract class AbstractPollingIoA
private volatile boolean selectable;
/** The thread responsible of accepting incoming requests */
- private Acceptor acceptor;
+ private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
/**
* Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
@@ -355,9 +354,12 @@ public abstract class AbstractPollingIoA
}
// start the acceptor if not already started
- synchronized (lock) {
- if (acceptor == null) {
- acceptor = new Acceptor();
+ Acceptor acceptor = acceptorRef.get();
+
+ if (acceptor == null) {
+ acceptor = new Acceptor();
+
+ if (acceptorRef.compareAndSet(null, acceptor)) {
executeWorker(acceptor);
}
}
@@ -390,6 +392,8 @@ public abstract class AbstractPollingIoA
*/
private class Acceptor implements Runnable {
public void run() {
+ assert (acceptorRef.get() == this);
+
int nHandles = 0;
while (selectable) {
@@ -418,13 +422,19 @@ public abstract class AbstractPollingIoA
// quit the loop: we don't have any socket listening
// for incoming connection.
if (nHandles == 0) {
- synchronized (lock) {
- if (registerQueue.isEmpty()
- && cancelQueue.isEmpty()) {
- acceptor = null;
- break;
- }
+ acceptorRef.set(null);
+
+ if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
+ assert (acceptorRef.get() != this);
+ break;
+ }
+
+ if (!acceptorRef.compareAndSet(null, this)) {
+ assert (acceptorRef.get() != this);
+ break;
}
+
+ assert (acceptorRef.get() == this);
}
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java Tue Feb 15 13:57:29 2011
@@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.filterchain.IoFilter;
@@ -63,7 +64,6 @@ import org.apache.mina.util.ExceptionMon
public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
extends AbstractIoConnector {
- private final Object lock = new Object();
private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
private final IoProcessor<T> processor;
@@ -74,7 +74,7 @@ public abstract class AbstractPollingIoC
private volatile boolean selectable;
/** The connector thread */
- private Connector connector;
+ private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
/**
* Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
@@ -353,9 +353,12 @@ public abstract class AbstractPollingIoC
cancelQueue.clear();
}
- synchronized (lock) {
- if (connector == null) {
- connector = new Connector();
+ Connector connector = connectorRef.get();
+
+ if (connector == null) {
+ connector = new Connector();
+
+ if (connectorRef.compareAndSet(null, connector)) {
executeWorker(connector);
}
}
@@ -463,7 +466,10 @@ public abstract class AbstractPollingIoC
private class Connector implements Runnable {
public void run() {
+ assert (connectorRef.get() == this);
+
int nHandles = 0;
+
while (selectable) {
try {
// the timeout for select shall be smaller of the connect
@@ -482,12 +488,19 @@ public abstract class AbstractPollingIoC
nHandles -= cancelKeys();
if (nHandles == 0) {
- synchronized (lock) {
- if (connectQueue.isEmpty()) {
- connector = null;
- break;
- }
+ connectorRef.set(null);
+
+ if (connectQueue.isEmpty()) {
+ assert (connectorRef.get() != this);
+ break;
+ }
+
+ if (!connectorRef.compareAndSet(null, this)) {
+ assert (connectorRef.get() != this);
+ break;
}
+
+ assert (connectorRef.get() == this);
}
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=1070902&r1=1070901&r2=1070902&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Tue Feb 15 13:57:29 2011
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
@@ -85,9 +86,6 @@ public abstract class AbstractPollingIoP
/** A map containing the last Thread ID for each class */
private static final Map<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
- /** A lock used to protect the processor creation */
- private final Object lock = new Object();
-
/** This IoProcessor instance name */
private final String threadName;
@@ -110,7 +108,7 @@ public abstract class AbstractPollingIoP
private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
/** The processor thread : it handles the incoming messages */
- private Processor processor;
+ private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
private long lastIdleCheckTime;
@@ -456,9 +454,12 @@ public abstract class AbstractPollingIoP
* pool. The Runnable will be renamed
*/
private void startupProcessor() {
- synchronized (lock) {
- if (processor == null) {
- processor = new Processor();
+ Processor processor = processorRef.get();
+
+ if (processor == null) {
+ processor = new Processor();
+
+ if (processorRef.compareAndSet(null, processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}
@@ -1077,6 +1078,8 @@ public abstract class AbstractPollingIoP
*/
private class Processor implements Runnable {
public void run() {
+ assert (processorRef.get() == this);
+
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
@@ -1151,12 +1154,23 @@ public abstract class AbstractPollingIoP
// Get a chance to exit the infinite loop if there are no
// more sessions on this Processor
if (nSessions == 0) {
- synchronized (lock) {
- if (newSessions.isEmpty() && isSelectorEmpty()) {
- processor = null;
- break;
- }
+ processorRef.set(null);
+
+ if (newSessions.isEmpty() && isSelectorEmpty()) {
+ // newSessions.add() precedes startupProcessor
+ assert (processorRef.get() != this);
+ break;
+ }
+
+ assert (processorRef.get() != this);
+
+ if (!processorRef.compareAndSet(null, this)) {
+ // startupProcessor won race, so must exit processor
+ assert (processorRef.get() != this);
+ break;
}
+
+ assert (processorRef.get() == this);
}
// Disconnect all sessions immediately if disposal has been