You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2011/11/30 21:07:37 UTC
svn commit: r1208745 -
/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Author: jvermillard
Date: Wed Nov 30 20:07:37 2011
New Revision: 1208745
URL: http://svn.apache.org/viewvc?rev=1208745&view=rev
Log:
fixing NPE on write request without futures
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1208745&r1=1208744&r2=1208745&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Wed Nov 30 20:07:37 2011
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
+import org.apache.mina.api.RuntimeIoException;
import org.apache.mina.service.AbstractIoService;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.service.SelectorStrategy;
@@ -68,10 +69,15 @@ public class NioSelectorProcessor implem
private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
+ // read buffer for all the incoming bytes
private ByteBuffer readBuffer;
+ // the thread polling and processing the I/O events
+ private SelectorWorker worker = null;
+
/**
* new binded server to add to the selector {ServerSocketChannel, IoServer}
+ * jvermillard : FIXME the typing is ugly !!!
*/
private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
@@ -91,13 +97,14 @@ public class NioSelectorProcessor implem
private Selector selector;
- // Lock for Selector worker, using default. can look into fairness later
+ // Lock for Selector worker, using default. can look into fairness later.
+ // We need to think about a lock less mechanism here.
private Lock workerLock = new ReentrantLock();
public NioSelectorProcessor(String name, SelectorStrategy strategy) {
this.strategy = strategy;
- // TODO : configurable parameter
+ // FIXME : configurable parameter
readBuffer = ByteBuffer.allocate(1024);
}
@@ -112,8 +119,10 @@ public class NioSelectorProcessor implem
wakeupWorker();
}
- private SelectorWorker worker = null;
-
+ /**
+ * Wake the I/O worker thread and if none exists, create a new one
+ * FIXME : too much locking there ?
+ */
private void wakeupWorker() {
workerLock.lock();
try {
@@ -130,9 +139,14 @@ public class NioSelectorProcessor implem
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void bindAndAcceptAddress(IoServer server, SocketAddress address) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+
+ // FIXME : should be "genericified"
if (server instanceof AbstractTcpServer) {
serverSocketChannel.socket().setReuseAddress(((AbstractTcpServer) server).isReuseAddress());
}
@@ -142,6 +156,9 @@ public class NioSelectorProcessor implem
add(serverSocketChannel, server);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void unbind(SocketAddress address) throws IOException {
ServerSocketChannel channel = serverSocketChannels.get(address);
@@ -155,6 +172,9 @@ public class NioSelectorProcessor implem
wakeupWorker();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void createSession(IoService service, Object clientSocket) {
LOGGER.debug("create session");
@@ -167,6 +187,7 @@ public class NioSelectorProcessor implem
socketChannel.configureBlocking(false);
} catch (IOException e) {
LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
+ throw new RuntimeIoException("cannot configure socket as non-blocking", e);
}
// apply the default service socket configuration
@@ -220,7 +241,31 @@ public class NioSelectorProcessor implem
}
/**
- * The worker processing incoming session creation and destruction requests.
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush(IoSession session) {
+ LOGGER.debug("scheduling session {} for writing", session);
+ // add the session to the list of session to be registered for writing
+ flushingSessions.add((NioTcpSession) session);
+ // wake the selector for unlocking the I/O thread
+ wakeupWorker();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void scheduleForClose(IoSession session) {
+ LOGGER.debug("scheduling session {} for close", session);
+ // add the session in the list of session for close.
+ sessionsToClose.add((NioTcpSession) session);
+ // wake the selector for unlocking the I/O thread
+ wakeupWorker();
+ }
+
+ /**
+ * The worker processing incoming session creation, session destruction requests, session write and reads.
* It will also bind new servers.
*/
private class SelectorWorker extends Thread {
@@ -290,7 +335,7 @@ public class NioSelectorProcessor implem
SelectionKey key = sessionReadKey.remove(session);
key.cancel();
- // needed ?
+ // closing underlying socket
session.getSocketChannel().close();
// fire the event
session.getFilterChain().processSessionClosed(session);
@@ -362,7 +407,10 @@ public class NioSelectorProcessor implem
// it
queue.remove();
// complete the future
- ((DefaultWriteFuture) wreq.getFuture()).complete();
+ DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+ if (future != null) {
+ future.complete();
+ }
} else {
// output socket buffer is full, we need
// to give up until next selection for
@@ -441,16 +489,4 @@ public class NioSelectorProcessor implem
}
}
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void flush(IoSession session) {
- LOGGER.debug("scheduling session {} for writing", session.toString());
- // add the session to the list of session to be registered for writing
- // wake the selector
- flushingSessions.add((NioTcpSession) session);
- wakeupWorker();
- }
}