You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/03/22 19:06:32 UTC
svn commit: r640032 - in
/mina/trunk/core/src/main/java/org/apache/mina/common:
AbstractIoService.java AbstractPollingIoAcceptor.java
Author: trustin
Date: Sat Mar 22 11:06:32 2008
New Revision: 640032
URL: http://svn.apache.org/viewvc?rev=640032&view=rev
Log:
Fixed the same problem that calls wakeup() after disposal in AbstractPollingIoAcceptor
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=640032&r1=640031&r2=640032&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Sat Mar 22 11:06:32 2008
@@ -50,7 +50,7 @@
s.setLastReadTime(s.getActivationTime());
s.setLastWriteTime(s.getActivationTime());
s.lastThroughputCalculationTime = s.getActivationTime();
-
+
// Start idleness notification.
idleStatusChecker.addService(s);
}
@@ -63,7 +63,7 @@
public void sessionCreated(IoSession session) {}
public void sessionDestroyed(IoSession session) {}
};
-
+
/**
* Current filter chain builder.
*/
@@ -73,7 +73,7 @@
* Current handler.
*/
private IoHandler handler;
-
+
private IoSessionDataStructureFactory sessionDataStructureFactory =
new DefaultIoSessionDataStructureFactory();
@@ -85,7 +85,12 @@
private final Executor executor;
private final String threadName;
private final boolean createdExecutor;
- private final Object disposalLock = new Object();
+
+ /**
+ * A lock object which must be acquired when related resources are
+ * destroyed.
+ */
+ protected final Object disposalLock = new Object();
private volatile boolean disposing;
private volatile boolean disposed;
private IoFuture disposalFuture;
@@ -96,7 +101,7 @@
private final AtomicLong writtenMessages = new AtomicLong();
private long lastReadTime;
private long lastWriteTime;
-
+
private final AtomicLong scheduledWriteBytes = new AtomicLong();
private final AtomicLong scheduledWriteMessages = new AtomicLong();
@@ -130,11 +135,11 @@
private long lastIdleTimeForBoth;
private long lastIdleTimeForRead;
private long lastIdleTimeForWrite;
-
+
/**
* The default {@link IoSessionConfig} which will be used to configure new sessions.
*/
- private IoSessionConfig sessionConfig;
+ private final IoSessionConfig sessionConfig;
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
if (sessionConfig == null) {
@@ -151,11 +156,11 @@
this.listeners = new IoServiceListenerSupport(this);
this.listeners.add(serviceActivationListener);
this.sessionConfig = sessionConfig;
-
+
// Make JVM load the exception monitor before some transports
// change the thread context class loader.
ExceptionMonitor.getInstance();
-
+
if (executor == null) {
this.executor = Executors.newCachedThreadPool();
this.createdExecutor = true;
@@ -163,9 +168,9 @@
this.executor = executor;
this.createdExecutor = false;
}
-
+
this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-
+
executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
}
@@ -200,15 +205,15 @@
public final boolean isActive() {
return listeners.isActive();
}
-
+
public final boolean isDisposing() {
return disposing;
}
-
+
public final boolean isDisposed() {
return disposed;
}
-
+
public final void dispose() {
if (disposed) {
return;
@@ -230,7 +235,7 @@
}
}
}
-
+
idleStatusChecker.getNotifyingTask().cancel();
if (disposalFuture != null) {
disposalFuture.awaitUninterruptibly();
@@ -249,7 +254,7 @@
disposed = true;
}
-
+
/**
* Implement this method to release any acquired resources. This method
* is invoked only once by {@link #dispose()}.
@@ -343,11 +348,11 @@
this.throughputCalculationInterval = throughputCalculationInterval;
}
-
+
public final long getThroughputCalculationIntervalInMillis() {
return throughputCalculationInterval * 1000L;
}
-
+
public final double getReadBytesThroughput() {
resetThroughput();
return readBytesThroughput;
@@ -367,23 +372,23 @@
resetThroughput();
return writtenMessagesThroughput;
}
-
+
public final double getLargestReadBytesThroughput() {
return largestReadBytesThroughput;
}
-
+
public final double getLargestWrittenBytesThroughput() {
return largestWrittenBytesThroughput;
}
-
+
public final double getLargestReadMessagesThroughput() {
return largestReadMessagesThroughput;
}
-
+
public final double getLargestWrittenMessagesThroughput() {
return largestWrittenMessagesThroughput;
}
-
+
private void resetThroughput() {
if (getManagedSessionCount() == 0) {
readBytesThroughput = 0;
@@ -400,17 +405,17 @@
if (minInterval == 0 || interval < minInterval) {
return;
}
-
+
long readBytes = this.readBytes.get();
long writtenBytes = this.writtenBytes.get();
long readMessages = this.readMessages.get();
long writtenMessages = this.writtenMessages.get();
-
+
readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
-
+
if (readBytesThroughput > largestReadBytesThroughput) {
largestReadBytesThroughput = readBytesThroughput;
}
@@ -423,16 +428,16 @@
if (writtenMessagesThroughput > largestWrittenMessagesThroughput) {
largestWrittenMessagesThroughput = writtenMessagesThroughput;
}
-
+
lastReadBytes = readBytes;
lastWrittenBytes = writtenBytes;
lastReadMessages = readMessages;
lastWrittenMessages = writtenMessages;
-
+
lastThroughputCalculationTime = currentTime;
}
}
-
+
public final long getScheduledWriteBytes() {
return scheduledWriteBytes.get();
}
@@ -472,7 +477,7 @@
public final long getLastWriteTime() {
return lastWriteTime;
}
-
+
protected final void setLastWriteTime(long lastWriteTime) {
this.lastWriteTime = lastWriteTime;
}
@@ -523,7 +528,7 @@
if (idleTime < 0) {
throw new IllegalArgumentException("Illegal idle time: " + idleTime);
}
-
+
if (status == IdleStatus.BOTH_IDLE) {
idleTimeForBoth = idleTime;
} else if (status == IdleStatus.READER_IDLE) {
@@ -533,7 +538,7 @@
} else {
throw new IllegalArgumentException("Unknown idle status: " + status);
}
-
+
if (idleTime == 0) {
if (status == IdleStatus.BOTH_IDLE) {
idleCountForBoth = 0;
@@ -607,10 +612,10 @@
throw new IllegalArgumentException("Unknown idle status: " + status);
}
}
-
+
protected final void notifyIdleness(long currentTime) {
updateThroughput(currentTime);
-
+
synchronized (idlenessCheckLock) {
notifyIdleness(
currentTime,
@@ -618,14 +623,14 @@
IdleStatus.BOTH_IDLE, Math.max(
getLastIoTime(),
getLastIdleTime(IdleStatus.BOTH_IDLE)));
-
+
notifyIdleness(
currentTime,
getIdleTimeInMillis(IdleStatus.READER_IDLE),
IdleStatus.READER_IDLE, Math.max(
getLastReadTime(),
getLastIdleTime(IdleStatus.READER_IDLE)));
-
+
notifyIdleness(
currentTime,
getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
@@ -634,7 +639,7 @@
getLastIdleTime(IdleStatus.WRITER_IDLE)));
}
}
-
+
private void notifyIdleness(
long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
if (idleTime > 0 && lastIoTime != 0
@@ -667,7 +672,7 @@
public final int getWriterIdleCount() {
return getIdleCount(IdleStatus.WRITER_IDLE);
}
-
+
public final int getBothIdleTime() {
return getIdleTime(IdleStatus.BOTH_IDLE);
}
@@ -717,7 +722,7 @@
}
public final Set<WriteFuture> broadcast(Object message) {
- // Convert to Set. We do not return a List here because only the
+ // Convert to Set. We do not return a List here because only the
// direct caller of MessageBroadcaster knows the order of write
// operations.
final List<WriteFuture> futures = IoUtil.broadcast(
@@ -734,19 +739,19 @@
}
};
}
-
+
protected final IoServiceListenerSupport getListeners() {
return listeners;
}
-
+
protected final IdleStatusChecker getIdleStatusChecker() {
return idleStatusChecker;
}
-
+
protected final void executeWorker(Runnable worker) {
executeWorker(worker, null);
}
-
+
protected final void executeWorker(Runnable worker, String suffix) {
String actualThreadName = threadName;
if (suffix != null) {
@@ -754,7 +759,7 @@
}
executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}
-
+
// TODO Figure out make it work without causing a compiler error / warning.
@SuppressWarnings("unchecked")
protected final void finishSessionInitialization(
@@ -795,14 +800,14 @@
// DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
}
-
+
if (sessionInitializer != null) {
sessionInitializer.initializeSession(session, future);
}
-
+
finishSessionInitialization0(session, future);
}
-
+
/**
* Implement this method to perform additional tasks required for session
* initialization. Do not call this method directly;
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=640032&r1=640031&r2=640032&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Sat Mar 22 11:06:32 2008
@@ -50,7 +50,7 @@
private final Map<SocketAddress, H> boundHandles =
Collections.synchronizedMap(new HashMap<SocketAddress, H>());
-
+
private final ServiceOperationFuture disposalFuture =
new ServiceOperationFuture();
private volatile boolean selectable;
@@ -66,7 +66,7 @@
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
}
-
+
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
this(sessionConfig, null, processor, false);
}
@@ -77,14 +77,14 @@
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
super(sessionConfig, executor);
-
+
if (processor == null) {
throw new NullPointerException("processor");
}
-
+
this.processor = processor;
this.createdProcessor = createdProcessor;
-
+
try {
init();
selectable = true;
@@ -196,7 +196,7 @@
private class Worker implements Runnable {
public void run() {
int nHandles = 0;
-
+
while (selectable) {
try {
// gets the number of keys that are ready to go
@@ -233,7 +233,7 @@
}
}
}
-
+
if (selectable && isDisposing()) {
selectable = false;
try {
@@ -242,7 +242,11 @@
}
} finally {
try {
- destroy();
+ synchronized (disposalLock) {
+ if (isDisposing()) {
+ destroy();
+ }
+ }
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
@@ -271,7 +275,7 @@
if (session == null) {
break;
}
-
+
finishSessionInitialization(session, null, null);
// add the session to the SocketIoProcessor
@@ -295,16 +299,16 @@
if (future == null) {
break;
}
-
+
Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
List<SocketAddress> localAddresses = future.getLocalAddresses();
-
+
try {
for (SocketAddress a: localAddresses) {
H handle = open(a);
newHandles.put(localAddress(handle), handle);
}
-
+
boundHandles.putAll(newHandles);
// and notify.
@@ -326,7 +330,7 @@
}
}
}
-
+
return 0;
}
@@ -343,7 +347,7 @@
if (future == null) {
break;
}
-
+
// close the channels
for (SocketAddress a: future.getLocalAddresses()) {
H handle = boundHandles.remove(a);
@@ -360,10 +364,10 @@
cancelledHandles ++;
}
}
-
+
future.setDone();
}
-
+
return cancelledHandles;
}