You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2011/11/30 21:07:37 UTC

svn commit: r1208745 - /mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java

Author: jvermillard
Date: Wed Nov 30 20:07:37 2011
New Revision: 1208745

URL: http://svn.apache.org/viewvc?rev=1208745&view=rev
Log:
fixing NPE on write request without futures

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1208745&r1=1208744&r2=1208745&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Wed Nov 30 20:07:37 2011
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.mina.api.IoServer;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
+import org.apache.mina.api.RuntimeIoException;
 import org.apache.mina.service.AbstractIoService;
 import org.apache.mina.service.SelectorProcessor;
 import org.apache.mina.service.SelectorStrategy;
@@ -68,10 +69,15 @@ public class NioSelectorProcessor implem
 
     private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
 
+    // read buffer for all the incoming bytes
     private ByteBuffer readBuffer;
 
+    // the thread polling and processing the I/O events 
+    private SelectorWorker worker = null;
+
     /**
      * new binded server to add to the selector {ServerSocketChannel, IoServer}
+     * jvermillard : FIXME the typing is ugly !!!
      */
     private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
 
@@ -91,13 +97,14 @@ public class NioSelectorProcessor implem
 
     private Selector selector;
 
-    // Lock for Selector worker, using default. can look into fairness later
+    // Lock for Selector worker, using default. can look into fairness later.
+    // We need to think about a lock less mechanism here.
     private Lock workerLock = new ReentrantLock();
 
     public NioSelectorProcessor(String name, SelectorStrategy strategy) {
         this.strategy = strategy;
 
-        // TODO : configurable parameter
+        // FIXME : configurable parameter
         readBuffer = ByteBuffer.allocate(1024);
     }
 
@@ -112,8 +119,10 @@ public class NioSelectorProcessor implem
         wakeupWorker();
     }
 
-    private SelectorWorker worker = null;
-
+    /**
+     * Wake the I/O worker thread and if none exists, create a new one
+     * FIXME : too much locking there ?
+     */
     private void wakeupWorker() {
         workerLock.lock();
         try {
@@ -130,9 +139,14 @@ public class NioSelectorProcessor implem
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void bindAndAcceptAddress(IoServer server, SocketAddress address) throws IOException {
         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+
+        // FIXME : should be "genericified"
         if (server instanceof AbstractTcpServer) {
             serverSocketChannel.socket().setReuseAddress(((AbstractTcpServer) server).isReuseAddress());
         }
@@ -142,6 +156,9 @@ public class NioSelectorProcessor implem
         add(serverSocketChannel, server);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void unbind(SocketAddress address) throws IOException {
         ServerSocketChannel channel = serverSocketChannels.get(address);
@@ -155,6 +172,9 @@ public class NioSelectorProcessor implem
         wakeupWorker();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void createSession(IoService service, Object clientSocket) {
         LOGGER.debug("create session");
@@ -167,6 +187,7 @@ public class NioSelectorProcessor implem
             socketChannel.configureBlocking(false);
         } catch (IOException e) {
             LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
+            throw new RuntimeIoException("cannot configure socket as non-blocking", e);
         }
 
         // apply the default service socket configuration
@@ -220,7 +241,31 @@ public class NioSelectorProcessor implem
     }
 
     /**
-     * The worker processing incoming session creation and destruction requests.
+     * {@inheritDoc}
+     */
+    @Override
+    public void flush(IoSession session) {
+        LOGGER.debug("scheduling session {} for writing", session);
+        // add the session to the list of session to be registered for writing
+        flushingSessions.add((NioTcpSession) session);
+        // wake the selector for unlocking the I/O thread
+        wakeupWorker();
+    }
+
+    /** 
+     * {@inheritDoc}
+     */
+    @Override
+    public void scheduleForClose(IoSession session) {
+        LOGGER.debug("scheduling session {} for close", session);
+        // add the session in the list of session for close.
+        sessionsToClose.add((NioTcpSession) session);
+        // wake the selector for unlocking the I/O thread
+        wakeupWorker();
+    }
+
+    /**
+     * The worker processing incoming session creation, session destruction requests, session write and reads.
      * It will also bind new servers.
      */
     private class SelectorWorker extends Thread {
@@ -290,7 +335,7 @@ public class NioSelectorProcessor implem
                             SelectionKey key = sessionReadKey.remove(session);
                             key.cancel();
 
-                            // needed ?
+                            // closing underlying socket
                             session.getSocketChannel().close();
                             // fire the event
                             session.getFilterChain().processSessionClosed(session);
@@ -362,7 +407,10 @@ public class NioSelectorProcessor implem
                                         // it
                                         queue.remove();
                                         // complete the future
-                                        ((DefaultWriteFuture) wreq.getFuture()).complete();
+                                        DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+                                        if (future != null) {
+                                            future.complete();
+                                        }
                                     } else {
                                         // output socket buffer is full, we need
                                         // to give up until next selection for
@@ -441,16 +489,4 @@ public class NioSelectorProcessor implem
             }
         }
     }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void flush(IoSession session) {
-        LOGGER.debug("scheduling session {} for writing", session.toString());
-        // add the session to the list of session to be registered for writing
-        // wake the selector
-        flushingSessions.add((NioTcpSession) session);
-        wakeupWorker();
-    }
 }