You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/03/21 13:10:51 UTC
svn commit: r639601 - in /mina/trunk:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/transport/socket/nio/
transport-apr/src/main/java/org/apache/mina/transport/socket/apr/
Author: trustin
Date: Fri Mar 21 05:10:25 2008
New Revision: 639601
URL: http://svn.apache.org/viewvc?rev=639601&view=rev
Log:
Resolved issue: DIRMINA-514 (Session closing problem on Mac OS X)
* Applied Rob Butler's patch, which will fix this problem hopefully
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java Fri Mar 21 05:10:25 2008
@@ -48,10 +48,10 @@
* It improves memory utilization and write throughput significantly.
*/
private static final int WRITE_SPIN_COUNT = 256;
-
+
private static final Map<Class<?>, AtomicInteger> threadIds =
new CopyOnWriteMap<Class<?>, AtomicInteger>();
-
+
private final Object lock = new Object();
private final String threadName;
private final Executor executor;
@@ -74,11 +74,11 @@
if (executor == null) {
throw new NullPointerException("executor");
}
-
+
this.threadName = nextThreadName();
this.executor = executor;
}
-
+
private String nextThreadName() {
Class<?> cls = getClass();
AtomicInteger threadId = threadIds.get(cls);
@@ -89,18 +89,18 @@
} else {
newThreadId = threadId.incrementAndGet();
}
-
+
return cls.getSimpleName() + '-' + newThreadId;
}
-
+
public final boolean isDisposing() {
return disposing;
}
-
+
public final boolean isDisposed() {
return disposed;
}
-
+
public final void dispose() {
if (disposed) {
return;
@@ -112,11 +112,11 @@
startupWorker();
}
}
-
+
disposalFuture.awaitUninterruptibly();
disposed = true;
}
-
+
protected abstract void dispose0() throws Exception;
/**
@@ -126,6 +126,7 @@
* @throws Exception if some low level IO error occurs
*/
protected abstract boolean select(int timeout) throws Exception;
+ protected abstract boolean isSelectorEmpty();
protected abstract void wakeup();
protected abstract Iterator<T> allSessions();
protected abstract Iterator<T> selectedSessions();
@@ -248,7 +249,7 @@
break;
}
-
+
if (addNow(session)) {
addedSessions ++;
}
@@ -344,14 +345,14 @@
private void clearWriteRequestQueue(T session) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
-
+
List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
-
+
if ((req = writeRequestQueue.poll(session)) != null) {
Object m = req.getMessage();
if (m instanceof IoBuffer) {
IoBuffer buf = (IoBuffer) req.getMessage();
-
+
// The first unwritten empty buffer must be
// forwarded to the filter chain.
if (buf.hasRemaining()) {
@@ -363,13 +364,13 @@
} else {
failedRequests.add(req);
}
-
+
// Discard others.
while ((req = writeRequestQueue.poll(session)) != null) {
failedRequests.add(req);
}
}
-
+
// Create an exception and notify.
if (!failedRequests.isEmpty()) {
WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
@@ -501,12 +502,12 @@
scheduleRemove(session);
return false;
}
-
- final boolean hasFragmentation =
+
+ final boolean hasFragmentation =
session.getTransportMetadata().hasFragmentation();
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
+
// Set limitation for the number of written bytes for read-write
// fairness. I used maxReadBufferSize * 3 / 2, which yields best
// performance in my experience while not breaking fairness much.
@@ -526,7 +527,7 @@
}
session.setCurrentWriteRequest(req);
}
-
+
int localWrittenBytes = 0;
Object message = req.getMessage();
if (message instanceof IoBuffer) {
@@ -537,7 +538,7 @@
localWrittenBytes = writeFile(
session, req, hasFragmentation,
maxWrittenBytes - writtenBytes);
-
+
// Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
// If there's still data to be written in the FileRegion, return 0 indicating that we need
// to pause until writing may resume.
@@ -549,9 +550,9 @@
} else {
throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?");
}
-
+
writtenBytes += localWrittenBytes;
-
+
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much.
setInterestedInWrite(session, true);
@@ -695,13 +696,13 @@
if (nSessions == 0) {
synchronized (lock) {
- if (newSessions.isEmpty()) {
+ if (newSessions.isEmpty() && isSelectorEmpty()) {
worker = null;
break;
}
}
}
-
+
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
@@ -720,7 +721,7 @@
}
}
}
-
+
workerThread = null;
if (isDisposing()) {
try {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Fri Mar 21 05:10:25 2008
@@ -47,7 +47,7 @@
throw new RuntimeIoException("Failed to open a selector.", e);
}
}
-
+
private final Selector selector;
public NioProcessor(Executor executor) {
@@ -66,6 +66,11 @@
}
@Override
+ protected boolean isSelectorEmpty() {
+ return selector.keys().isEmpty();
+ }
+
+ @Override
protected void wakeup() {
selector.wakeup();
}
@@ -198,7 +203,7 @@
protected static class IoSessionIterator implements Iterator<NioSession> {
private final Iterator<SelectionKey> i;
private IoSessionIterator(Set<SelectionKey> keys) {
- i = keys.iterator();
+ i = keys.iterator();
}
public boolean hasNext() {
return i.hasNext();
Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=639601&r1=639600&r2=639601&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Fri Mar 21 05:10:25 2008
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.apr;
@@ -39,17 +39,17 @@
/**
* The class in charge of processing socket level IO events for the {@link AprSocketConnector}
- *
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
private static final int POLLSET_SIZE = 1024;
-
+
private final Map<Long, AprSession> allSessions =
new HashMap<Long, AprSession>(POLLSET_SIZE);
-
+
private final Object wakeupLock = new Object();
private long wakeupSocket;
private volatile boolean toBeWakenUp;
@@ -62,7 +62,7 @@
public AprIoProcessor(Executor executor) {
super(executor);
-
+
try {
wakeupSocket = Socket.create(
Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
@@ -77,7 +77,7 @@
// initialize a memory pool for APR functions
bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
-
+
boolean success = false;
long newPollset;
try {
@@ -94,7 +94,7 @@
Poll.APR_POLLSET_THREADSAFE,
Long.MAX_VALUE);
}
-
+
pollset = newPollset;
if (pollset < 0) {
if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
@@ -130,7 +130,7 @@
if (rv != -120001) {
throwException(rv);
}
-
+
rv = Poll.maintain(pollset, polledSockets, true);
if (rv > 0) {
for (int i = 0; i < rv; i ++) {
@@ -139,7 +139,7 @@
if (session == null) {
continue;
}
-
+
int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
(session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
@@ -169,23 +169,28 @@
if (session == null) {
continue;
}
-
+
session.setReadable((flag & Poll.APR_POLLIN) != 0);
session.setWritable((flag & Poll.APR_POLLOUT) != 0);
-
+
polledSessions.add(session);
}
-
+
return !polledSessions.isEmpty();
}
}
@Override
+ protected boolean isSelectorEmpty() {
+ return allSessions.isEmpty();
+ }
+
+ @Override
protected void wakeup() {
if (toBeWakenUp) {
return;
}
-
+
// Add a dummy socket to the pollset.
synchronized (wakeupLock) {
toBeWakenUp = true;
@@ -208,12 +213,12 @@
long s = session.getDescriptor();
Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
Socket.timeoutSet(s, 0);
-
+
int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
-
+
session.setInterestedInRead(true);
allSessions.put(s, session);
}
@@ -342,7 +347,7 @@
buf.skip(writtenBytes);
}
}
-
+
if (writtenBytes < 0) {
if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
writtenBytes = 0;