You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ap...@apache.org on 2011/06/30 13:29:32 UTC

svn commit: r1141485 - in /mina/branches/3.0/core/src/main/java/org/apache/mina: IoFilter.java service/client/AbstractIoClient.java transport/tcp/NioSelectorProcessor.java

Author: apaliwal
Date: Thu Jun 30 11:29:31 2011
New Revision: 1141485

URL: http://svn.apache.org/viewvc?rev=1141485&view=rev
Log:
Formating/Logging changes, added Lock usage instead of synchornized, added exceptionCaught api in IoFilter

Modified:
    mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java
    mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
    mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java

Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java Thu Jun 30 11:29:31 2011
@@ -96,4 +96,13 @@ public interface IoFilter {
      * @throws Exception Exception If an error occurs while processing
      */
     void messageReceived(IoSession session, Object message) throws Exception;
+
+    /**
+     * Invoked when an exception occurs while executing the method
+     *
+     * @param session   {@link IoSession} associated with invocation
+     * @param cause     Real {@link Throwable} which broke the normal chain processing
+     * @throws Exception If an error occurs while processing
+     */
+    void exceptionCaught(IoSession session, Throwable cause) throws Exception;
 }
\ No newline at end of file

Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Thu Jun 30 11:29:31 2011
@@ -42,51 +42,34 @@ public class AbstractIoClient extends Ab
     }
 
     @Override
-    public Map<Long, IoSession> getManagedSessions()
-    {
-        // TODO Auto-generated method stub
+    public Map<Long, IoSession> getManagedSessions() {
         return null;
     }
 
     @Override
-    public void addListener( IoServiceListener listener )
-    {
-        // TODO Auto-generated method stub
-        
+    public void addListener( IoServiceListener listener ) {
     }
 
     @Override
-    public void removeListener( IoServiceListener listener )
-    {
-        // TODO Auto-generated method stub
-        
+    public void removeListener( IoServiceListener listener ) {
     }
 
     @Override
-    public long getConnectTimeoutMillis()
-    {
-        // TODO Auto-generated method stub
+    public long getConnectTimeoutMillis() {
         return 0;
     }
 
     @Override
-    public void setConnectTimeoutMillis( long connectTimeoutInMillis )
-    {
-        // TODO Auto-generated method stub
-        
+    public void setConnectTimeoutMillis( long connectTimeoutInMillis ) {
     }
 
     @Override
-    public ConnectFuture connect( SocketAddress remoteAddress )
-    {
-        // TODO Auto-generated method stub
+    public ConnectFuture connect( SocketAddress remoteAddress ) {
         return null;
     }
 
     @Override
-    public ConnectFuture connect( SocketAddress remoteAddress, SocketAddress localAddress )
-    {
-        // TODO Auto-generated method stub
+    public ConnectFuture connect( SocketAddress remoteAddress, SocketAddress localAddress ) {
         return null;
     }
 

Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Thu Jun 30 11:29:31 2011
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.mina.IoServer;
 import org.apache.mina.IoService;
@@ -62,7 +64,7 @@ public class NioSelectorProcessor implem
 
     private SelectorStrategy strategy;
 
-    private Logger log;
+    private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorProcessor.class);
 
     private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
 
@@ -89,9 +91,11 @@ public class NioSelectorProcessor implem
 
     private Selector selector;
 
+    // Lock for Selector worker, using default. can look into fairness later
+    private Lock workerLock = new ReentrantLock();
+
     public NioSelectorProcessor(String name, SelectorStrategy strategy) {
         this.strategy = strategy;
-        this.log = LoggerFactory.getLogger("SelectorProcessor[" + name + "]");
 
         // TODO : configurable parameter
         readBuffer = ByteBuffer.allocate(1024);
@@ -103,22 +107,24 @@ public class NioSelectorProcessor implem
      * @param serverChannel
      */
     private void add(ServerSocketChannel serverChannel, IoServer server) {
-        log.debug("adding a server channel {} for server {}", serverChannel, server);
+        LOGGER.debug("adding a server channel {} for server {}", serverChannel, server);
         serversToAdd.add(new Object[] { serverChannel, server });
         wakeupWorker();
     }
 
-    private Object workerLock = new Object();
-
     private SelectorWorker worker = null;
 
     private void wakeupWorker() {
-        synchronized (workerLock) {
+        workerLock.lock();
+        try {
             if (worker == null) {
                 worker = new SelectorWorker();
                 worker.start();
             }
+        } finally {
+            workerLock.unlock();
         }
+
         if (selector != null) {
             selector.wakeup();
         }
@@ -140,14 +146,14 @@ public class NioSelectorProcessor implem
         channel.socket().close();
         channel.close();
         serverSocketChannels.remove(channel);
-        log.debug("removing a server channel " + channel);
+        LOGGER.debug("removing a server channel " + channel);
         serversToRemove.add(channel);
         wakeupWorker();
     }
 
     @Override
     public void createSession(IoService service, Object clientSocket) {
-        log.debug("create session");
+        LOGGER.debug("create session");
         SocketChannel socketChannel = (SocketChannel) clientSocket;
         NioTcpSession session = new NioTcpSession((NioTcpServer) service, socketChannel,
                 strategy.getSelectorForNewSession(this));
@@ -156,7 +162,7 @@ public class NioSelectorProcessor implem
         try {
             socketChannel.configureBlocking(false);
         } catch (IOException e) {
-            log.error("Unexpected exception, while configuring socket as non blocking", e);
+            LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
         }
 
         // TODO : event session created
@@ -180,12 +186,12 @@ public class NioSelectorProcessor implem
         @Override
         public void run() {
             if (selector == null) {
-                log.debug("opening a new selector");
+                LOGGER.debug("opening a new selector");
 
                 try {
                     selector = Selector.open();
                 } catch (IOException e) {
-                    log.error("IOException while opening a new Selector", e);
+                    LOGGER.error("IOException while opening a new Selector", e);
                 }
             }
 
@@ -198,7 +204,7 @@ public class NioSelectorProcessor implem
                             SelectionKey key = serverKey.remove(channel);
 
                             if (key == null) {
-                                log.error("The server socket was already removed of the selector");
+                                LOGGER.error("The server socket was already removed of the selector");
                             } else {
                                 key.cancel();
                             }
@@ -243,9 +249,9 @@ public class NioSelectorProcessor implem
                         }
                     }
 
-                    log.debug("selecting...");
+                    LOGGER.debug("selecting...");
                     int readyCount = selector.select(SELECT_TIMEOUT);
-                    log.debug("... done selecting : {}", readyCount);
+                    LOGGER.debug("... done selecting : {}", readyCount);
 
                     if (readyCount > 0) {
 
@@ -262,14 +268,14 @@ public class NioSelectorProcessor implem
                             selector.selectedKeys().remove(key);
 
                             if (key.isReadable()) {
-                                log.debug("readable client {}", key);
+                                LOGGER.debug("readable client {}", key);
                                 NioTcpSession session = (NioTcpSession) key.attachment();
                                 SocketChannel channel = session.getSocketChannel();
                                 int readCount = channel.read(readBuffer);
-                                log.debug("read {} bytes", readCount);
+                                LOGGER.debug("read {} bytes", readCount);
                                 if (readCount < 0) {
                                     // session closed by the remote peer
-                                    log.debug("session closed by the remote peer");
+                                    LOGGER.debug("session closed by the remote peer");
                                     sessionsToClose.add(session);
                                 } else {
                                     // we have read some data
@@ -280,7 +286,7 @@ public class NioSelectorProcessor implem
 
                             }
                             if (key.isWritable()) {
-                                log.debug("writable session : {}", key.attachment());
+                                LOGGER.debug("writable session : {}", key.attachment());
                                 NioTcpSession session = (NioTcpSession) key.attachment();
                                 // write from the session write queue
                                 WriteQueue queue = session.getWriteQueue();
@@ -292,7 +298,7 @@ public class NioSelectorProcessor implem
                                     }
                                     ByteBuffer buf = (ByteBuffer) wreq.getMessage();
                                     int wrote = session.getSocketChannel().write(buf);
-                                    log.debug("wrote {} bytes to {}", wrote, session);
+                                    LOGGER.debug("wrote {} bytes to {}", wrote, session);
                                     if (buf.remaining() == 0) {
                                         // completed write request, let's remove
                                         // it
@@ -313,12 +319,12 @@ public class NioSelectorProcessor implem
                                     // Selector for reads and another for the writes
                                     SelectionKey readKey = sessionReadKey.get(session);
                                     if (readKey != null) {
-                                        log.debug("registering key for only reading");
+                                        LOGGER.debug("registering key for only reading");
                                         SelectionKey mykey = session.getSocketChannel().register(selector,
                                                 SelectionKey.OP_READ, session);
                                         sessionReadKey.put(session, mykey);
                                     } else {
-                                        log.debug("cancel key for writing");
+                                        LOGGER.debug("cancel key for writing");
                                         session.getSocketChannel().keyFor(selector).cancel();
                                     }
                                 }
@@ -326,12 +332,12 @@ public class NioSelectorProcessor implem
                             }
 
                             if (key.isAcceptable()) {
-                                log.debug("acceptable new client {}", key);
+                                LOGGER.debug("acceptable new client {}", key);
                                 ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key.attachment())[0];
                                 IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
                                 // accepted connection
                                 SocketChannel newClientChannel = serverSocket.accept();
-                                log.debug("client accepted");
+                                LOGGER.debug("client accepted");
                                 // and give it's to the strategy
                                 strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
                                         newClientChannel);
@@ -359,15 +365,18 @@ public class NioSelectorProcessor implem
                         }
                     }
                 } catch (IOException e) {
-                    log.error("IOException while selecting selector", e);
+                    LOGGER.error("IOException while selecting selector", e);
                 }
 
                 // stop the worker if needed
-                synchronized (workerLock) {
+                workerLock.lock();
+                try {
                     if (selector.keys().isEmpty()) {
                         worker = null;
                         break;
                     }
+                }finally {
+                    workerLock.unlock();
                 }
             }
         }
@@ -375,7 +384,7 @@ public class NioSelectorProcessor implem
 
     @Override
     public void flush(IoSession session) {
-        log.debug("scheduling session {} for writing", session.toString());
+        LOGGER.debug("scheduling session {} for writing", session.toString());
         // add the session to the list of session to be registered for writing
         // wake the selector
         flushingSessions.add((NioTcpSession) session);