You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/12/28 14:01:21 UTC

svn commit: r1426490 - in /mina/mina/trunk: core/src/main/java/org/apache/mina/api/ core/src/main/java/org/apache/mina/service/client/ core/src/main/java/org/apache/mina/session/ core/src/main/java/org/apache/mina/transport/nio/ core/src/main/java/org/...

Author: jvermillard
Date: Fri Dec 28 13:01:20 2012
New Revision: 1426490

URL: http://svn.apache.org/viewvc?rev=1426490&view=rev
Log:
TCP client (WIP)

Added:
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java
    mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/
    mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java
Removed:
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSession.java
Modified:
    mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
    mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
    mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
    mina/mina/trunk/core/src/test/resources/log4j2-test.xml

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java Fri Dec 28 13:01:20 2012
@@ -19,48 +19,37 @@
  */
 package org.apache.mina.api;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 
 /**
- * Connects to endpoint, communicates with the server, and fires events to
- * {@link org.apache.mina.service.IoHandler}s.
- *
+ * Connects to endpoint, communicates with the server, and fires events to {@link org.apache.mina.service.IoHandler}s.
+ * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface IoClient extends IoService {
     /**
-     * Returns the connect timeout in milliseconds. The default value is 1
-     * minute.
-     *
+     * Returns the connect timeout in milliseconds. The default value is 1 minute.
+     * 
      * @return the connect timeout in milliseconds
      */
     long getConnectTimeoutMillis();
 
     /**
      * Sets the connect timeout in milliseconds. The default value is 1 minute.
-     *
+     * 
      * @param connectTimeoutInMillis Connection timeout in ms
      */
     void setConnectTimeoutMillis(long connectTimeoutInMillis);
 
     /**
      * Connects to the specified remote address.
-     *
+     * 
      * @param remoteAddress Remote {@link SocketAddress} to connect
-     * @return the {@link IoFuture} instance which is completed when the
-     *         connection attempt initiated by this call succeeds or fails.
+     * @return the {@link IoFuture} instance which is completed when the connection attempt initiated by this call
+     *         succeeds or fails.
+     * @throws IOException
      */
-    IoFuture<IoSession> connect(SocketAddress remoteAddress);
+    IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException;
 
-    /**
-     * Connects to the specified remote address binding to the specified local
-     * address.
-     *
-     * @param remoteAddress Remote {@link SocketAddress} to connect
-     * @param localAddress  Local {@link SocketAddress} to use while initiating connection to
-     *                      remote {@link SocketAddress}
-     * @return the {@link IoFuture} instance which is completed when the
-     *         connection attempt initiated by this call succeeds or fails.
-     */
-    IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress);
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Fri Dec 28 13:01:20 2012
@@ -19,11 +19,7 @@
  */
 package org.apache.mina.service.client;
 
-import java.net.SocketAddress;
-
 import org.apache.mina.api.IoClient;
-import org.apache.mina.api.IoFuture;
-import org.apache.mina.api.IoSession;
 import org.apache.mina.service.AbstractIoService;
 import org.apache.mina.service.executor.IoHandlerExecutor;
 
@@ -33,6 +29,9 @@ import org.apache.mina.service.executor.
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractIoClient extends AbstractIoService implements IoClient {
+
+    private long connectTimeoutInMillis = 10000;
+
     /**
      * Create an new AbstractIoClient instance
      */
@@ -42,21 +41,12 @@ public abstract class AbstractIoClient e
 
     @Override
     public long getConnectTimeoutMillis() {
-        return 0;
+        return connectTimeoutInMillis;
     }
 
     @Override
     public void setConnectTimeoutMillis(long connectTimeoutInMillis) {
-    }
-
-    @Override
-    public IoFuture<IoSession> connect(SocketAddress remoteAddress) {
-        return null;
-    }
-
-    @Override
-    public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
-        return null;
+        this.connectTimeoutInMillis = connectTimeoutInMillis;
     }
 
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Fri Dec 28 13:01:20 2012
@@ -643,9 +643,9 @@ public abstract class AbstractIoSession 
     }
 
     /**
-     * process session opened event using the filter chain. To be called by the session {@link SelectorLoop} .
+     * process session open event using the filter chain. To be called by the session {@link SelectorLoop} .
      */
-    public void processSessionOpened() {
+    public void processSessionOpen() {
         LOG.debug("processing session open event");
 
         try {

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Fri Dec 28 13:01:20 2012
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.mina.api.IoSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,25 +100,20 @@ public class NioSelectorLoop implements 
      * {@inheritDoc}
      */
     @Override
-    public void register(boolean accept, boolean read, boolean write, SelectorListener listener,
-            SelectableChannel channel) {
-        register(null, accept, read, write, listener, channel);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void register(IoSession session, boolean accept, boolean read, boolean write, SelectorListener listener,
-            SelectableChannel channel) {
-        logger.debug("registering : {} for accept : {}, read : {}, write : {}", new Object[] { listener, accept, read,
-                                write });
+    public void register(boolean accept, boolean connect, boolean read, boolean write, SelectorListener listener,
+            SelectableChannel channel, RegistrationCallback callback) {
+        logger.debug("registering : {} for accept : {}, connect: {}, read : {}, write : {}, channel : {}",
+                new Object[] { listener, accept, connect, read, write, channel });
         int ops = 0;
 
         if (accept) {
             ops |= SelectionKey.OP_ACCEPT;
         }
 
+        if (connect) {
+            ops |= SelectionKey.OP_CONNECT;
+        }
+
         if (read) {
             ops |= SelectionKey.OP_READ;
         }
@@ -129,7 +123,7 @@ public class NioSelectorLoop implements 
         }
 
         // TODO : if it's the same selector/worker, we don't need to do that we could directly enqueue
-        registrationQueue.add(new Registration(session, ops, channel, listener));
+        registrationQueue.add(new Registration(ops, channel, listener, callback));
 
         // Now, wakeup the selector in order to let it update the selectionKey status
         selector.wakeup();
@@ -141,8 +135,8 @@ public class NioSelectorLoop implements 
     @Override
     public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
             final SelectorListener listener, final SelectableChannel channel) {
-        logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
-                                accept, read, write });
+        logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}, channel : {}", new Object[] {
+                                listener, accept, read, write, channel });
 
         final SelectionKey key = channel.keyFor(selector);
         if (key == null) {
@@ -161,7 +155,9 @@ public class NioSelectorLoop implements 
             ops |= SelectionKey.OP_WRITE;
         }
         key.interestOps(ops);
-        key.selector().wakeup();
+
+        // we need to wakeup for the registration to be modified (TODO : not needed if we are in the worker thread)
+        selector.wakeup();
     }
 
     /**
@@ -205,10 +201,12 @@ public class NioSelectorLoop implements 
                     while (it.hasNext()) {
                         final SelectionKey key = it.next();
                         final SelectorListener listener = (SelectorListener) key.attachment();
-                        listener.ready(key.isAcceptable(), key.isReadable(), key.isReadable() ? readBuffer : null,
-                                key.isWritable());
+                        logger.debug("key : {}", key);
+                        listener.ready(key.isAcceptable(), key.isConnectable(), key.isReadable(),
+                                key.isReadable() ? readBuffer : null, key.isWritable());
                         // if you don't remove the event of the set, the selector will present you this event again and
                         // again
+                        logger.debug("remove");
                         it.remove();
                     }
 
@@ -219,10 +217,8 @@ public class NioSelectorLoop implements 
                         try {
                             SelectionKey selectionKey = reg.channel.register(selector, reg.ops, reg.listener);
 
-                            IoSession session = reg.getSession();
-
-                            if (session != null) {
-                                ((NioSession) session).setSelectionKey(selectionKey);
+                            if (reg.getCallback() != null) {
+                                reg.getCallback().done(selectionKey);
                             }
                         } catch (final ClosedChannelException ex) {
                             // dead session..
@@ -236,13 +232,18 @@ public class NioSelectorLoop implements 
         }
     }
 
-    private class Registration {
+    @Override
+    public void wakeup() {
+        selector.wakeup();
+    }
+
+    private static class Registration {
 
-        public Registration(IoSession session, int ops, SelectableChannel channel, SelectorListener listener) {
+        public Registration(int ops, SelectableChannel channel, SelectorListener listener, RegistrationCallback callback) {
             this.ops = ops;
             this.channel = channel;
             this.listener = listener;
-            this.session = session;
+            this.callback = callback;
         }
 
         private final int ops;
@@ -251,15 +252,10 @@ public class NioSelectorLoop implements 
 
         private final SelectorListener listener;
 
-        private final IoSession session;
+        private final RegistrationCallback callback;
 
-        public IoSession getSession() {
-            return session;
+        public RegistrationCallback getCallback() {
+            return callback;
         }
     }
-
-    @Override
-    public void wakeup() {
-        selector.wakeup();
-    }
 }
\ No newline at end of file

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java Fri Dec 28 13:01:20 2012
@@ -19,26 +19,126 @@
  */
 package org.apache.mina.transport.nio;
 
-import org.apache.mina.api.IoSessionConfig;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.InOrderHandlerExecutor;
 import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.tcp.AbstractTcpClient;
+import org.apache.mina.util.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * TODO
+ * This class implements a TCP NIO based client.
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class NioTcpClient extends AbstractTcpClient {
+
+    /** A logger for this class */
+    static final Logger LOG = LoggerFactory.getLogger(NioTcpClient.class);
+
+    // the SelectorLoop for connecting the sessions
+    private final SelectorLoop connectSelectorLoop;
+
+    // the Selectorloop for handling read/write session events
+    private final SelectorLoopPool readWriteSelectorPool;
+
+    // for detecting idle session
+    private IdleChecker idleChecker;
+
+    /**
+     * Create a TCP client with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link InOrderHandlerExecutor})
+     */
+    public NioTcpClient() {
+        this(new NioSelectorLoop("connect", 0), new FixedSelectorLoopPool(
+                Runtime.getRuntime().availableProcessors() + 1), null);
+    }
+
+    /**
+     * Create a TCP client with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+     * the OP_CONNECT events. If the pool contains only one SelectorLoop, then all the events will be managed by the
+     * same Selector.
+     * 
+     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpClient(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
+        this.connectSelectorLoop = selectorLoopPool.getSelectorLoop();
+        this.readWriteSelectorPool = selectorLoopPool;
+    }
+
     /**
-     * Create a new instance of NioTcpClient
+     * Create a TCP client with provided selector loops pool
+     * 
+     * @param connectSelectorLoop the selector loop for handling connection events (connection of new session)
+     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
      */
-    public NioTcpClient(IoHandlerExecutor ioHandlerExecutor) {
-        super(ioHandlerExecutor);
+    public NioTcpClient(SelectorLoop connectSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+            IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
+        this.connectSelectorLoop = connectSelectorLoop;
+        this.readWriteSelectorPool = readWriteSelectorLoop;
+        idleChecker = new IndexedIdleChecker();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public IoSessionConfig getSessionConfig() {
-        // TODO Auto-generated method stub
-        return null;
+    public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+        Assert.assertNotNull(remoteAddress, "remoteAddress");
+
+        SocketChannel clientSocket = SocketChannel.open();
+
+        // non blocking
+        clientSocket.configureBlocking(false);
+
+        // connect to a running server
+        boolean connected = clientSocket.connect(remoteAddress);
+        final NioTcpSession session = new NioTcpSession(this, clientSocket, readWriteSelectorPool.getSelectorLoop(),
+                idleChecker);
+
+        NioTcpSession.ConnectFuture connectFuture = new NioTcpSession.ConnectFuture();
+        session.setConnectFuture(connectFuture);
+
+        if (!connected) {
+            // async connection, let's the connection complete in background, the selector loop will dectect whe nthe
+            // connection is successful
+            connectSelectorLoop.register(false, true, false, false, session, clientSocket, new RegistrationCallback() {
+
+                @Override
+                public void done(SelectionKey selectionKey) {
+                    session.setSelectionKey(selectionKey);
+                }
+            });
+        } else {
+            // already connected (probably a loopback connection)
+            // register for read
+            connectSelectorLoop.register(false, false, true, false, session, clientSocket, new RegistrationCallback() {
+
+                @Override
+                public void done(SelectionKey selectionKey) {
+                    session.setSelectionKey(selectionKey);
+                }
+            });
+            session.setConnected();
+        }
+        return connectFuture;
     }
+
 }
\ No newline at end of file

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Fri Dec 28 13:01:20 2012
@@ -34,6 +34,7 @@ import org.apache.mina.service.idlecheck
 import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.tcp.AbstractTcpServer;
 import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +68,7 @@ public class NioTcpServer extends Abstra
      */
     public NioTcpServer() {
         this(new NioSelectorLoop("accept", 0),
-                new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), new InOrderHandlerExecutor(
-                        Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().availableProcessors() + 1));
+                new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), null);
     }
 
     /**
@@ -76,8 +76,7 @@ public class NioTcpServer extends Abstra
      * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
      * Selector.
      * 
-     * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
-     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
      * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
      *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
      *        operations.
@@ -130,10 +129,7 @@ public class NioTcpServer extends Abstra
      */
     @Override
     public synchronized void bind(final SocketAddress localAddress) throws IOException {
-        if (localAddress == null) {
-            // We should at least have one address to bind on
-            throw new IllegalArgumentException("LocalAdress cannot be null");
-        }
+        Assert.assertNotNull(localAddress, "localAddress");
 
         // check if the address is already bound
         if (this.address != null) {
@@ -148,7 +144,7 @@ public class NioTcpServer extends Abstra
         serverChannel.socket().bind(address);
         serverChannel.configureBlocking(false);
 
-        acceptSelectorLoop.register(true, false, false, this, serverChannel);
+        acceptSelectorLoop.register(true, false, false, false, this, serverChannel, null);
 
         idleChecker = new IndexedIdleChecker();
         idleChecker.start();
@@ -203,7 +199,8 @@ public class NioTcpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
         if (accept) {
             LOG.debug("acceptable new client");
 
@@ -291,9 +288,13 @@ public class NioTcpServer extends Abstra
         }
 
         // add the session to the queue for being added to the selector
-        readWriteSelectorLoop.register(session, false, true, false, session, socketChannel);
+        readWriteSelectorLoop.register(false, false, true, false, session, socketChannel, new RegistrationCallback() {
 
-        session.processSessionOpened();
+            @Override
+            public void done(SelectionKey selectionKey) {
+                session.setSelectionKey(selectionKey);
+            }
+        });
         session.setConnected();
         idleChecker.sessionRead(session, System.currentTimeMillis());
         idleChecker.sessionWritten(session, System.currentTimeMillis());

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Fri Dec 28 13:01:20 2012
@@ -24,11 +24,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Queue;
 
 import org.apache.mina.api.IoService;
+import org.apache.mina.api.IoSession;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.session.AbstractIoSession;
 import org.apache.mina.session.DefaultWriteFuture;
@@ -37,6 +37,7 @@ import org.apache.mina.session.SslHelper
 import org.apache.mina.session.WriteRequest;
 import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
 import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  * 
  */
-public class NioTcpSession extends AbstractIoSession implements NioSession, SelectorListener {
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
 
@@ -60,6 +61,11 @@ public class NioTcpSession extends Abstr
     /** the socket configuration */
     private final TcpSessionConfig configuration;
 
+    /** the future representing this session connection operation (client only) */
+    private ConnectFuture connectFuture;
+
+    private SelectionKey selectionKey;
+
     NioTcpSession(final IoService service, final SocketChannel channel, final SelectorLoop selectorLoop,
             final IdleChecker idleChecker) {
         super(service, idleChecker);
@@ -68,6 +74,10 @@ public class NioTcpSession extends Abstr
         this.configuration = new ProxyTcpSessionConfig(channel.socket());
     }
 
+    void setConnectFuture(ConnectFuture connectFuture) {
+        this.connectFuture = connectFuture;
+    }
+
     /**
      * Get the underlying {@link SocketChannel} of this session
      * 
@@ -183,6 +193,12 @@ public class NioTcpSession extends Abstr
         }
 
         state = SessionState.CONNECTED;
+
+        if (connectFuture != null) {
+            connectFuture.complete(this);
+            connectFuture = null; // free some memory
+        }
+        processSessionOpen();
     }
 
     /**
@@ -359,7 +375,32 @@ public class NioTcpSession extends Abstr
      * {@inheritDoc}
      */
     @Override
-    public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
+        LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept,
+                                connect, read, write });
+        if (connect) {
+            try {
+
+                boolean isConnected = channel.finishConnect();
+                if (!isConnected) {
+                    LOG.error("unable to connect session {}", this);
+                } else {
+                    // cancel current registration for connection
+                    selectionKey.cancel();
+                    selectionKey = null;
+                    // register for reading
+                    selectorLoop.register(false, false, true, false, this, channel, null);
+                    setConnected();
+                }
+            } catch (IOException e) {
+                LOG.debug("Connection error, we cancel the future", e);
+                if (connectFuture != null) {
+                    connectFuture.error(e);
+                }
+            }
+        }
+
         if (read) {
             processRead(readBuffer);
         }
@@ -372,37 +413,29 @@ public class NioTcpSession extends Abstr
         }
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public SelectionKey getSelectionKey() {
-        return null;
+    void setSelectionKey(SelectionKey key) {
+        this.selectionKey = key;
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setSelectionKey(SelectionKey key) {
-        // TODO Auto-generated method stub
-
-    }
+    static class ConnectFuture extends AbstractIoFuture<IoSession> {
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setSelector(Selector selector) {
-        // TODO Auto-generated method stub
+        @Override
+        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+            return false;
+        }
 
-    }
+        /**
+         * session connected
+         */
+        public void complete(IoSession session) {
+            setResult(session);
+        }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void wakeup() {
-        selectorLoop.wakeup();
+        /**
+         * connection error
+         */
+        public void error(Exception e) {
+            setException(e);
+        }
     }
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java Fri Dec 28 13:01:20 2012
@@ -19,6 +19,11 @@
  */
 package org.apache.mina.transport.nio;
 
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
 import org.apache.mina.api.IoSessionConfig;
 import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.transport.udp.AbstractUdpClient;
@@ -41,4 +46,16 @@ public class NioUdpClient extends Abstra
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Fri Dec 28 13:01:20 2012
@@ -129,7 +129,7 @@ public class NioUdpServer extends Abstra
         datagramChannel.socket().bind(address);
         datagramChannel.configureBlocking(false);
 
-        selectorLoop.register(false, true, false, this, datagramChannel);
+        selectorLoop.register(false, false, true, false, this, datagramChannel, null);
 
         // it's the first address bound, let's fire the event
         this.fireServiceActivated();
@@ -171,7 +171,8 @@ public class NioUdpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
         if (read) {
             try {
                 LOG.debug("readable datagram for UDP service : {}", this);

Added: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java?rev=1426490&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java Fri Dec 28 13:01:20 2012
@@ -0,0 +1,8 @@
+package org.apache.mina.transport.nio;
+
+import java.nio.channels.SelectionKey;
+
+public interface RegistrationCallback {
+
+    void done(SelectionKey selectionKey);
+}

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java Fri Dec 28 13:01:20 2012
@@ -22,10 +22,12 @@ package org.apache.mina.transport.nio;
 import java.nio.ByteBuffer;
 
 /**
+ * Listen for selector events.
+ * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface SelectorListener {
 
-    void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write);
+    void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write);
 
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Fri Dec 28 13:01:20 2012
@@ -20,37 +20,22 @@ package org.apache.mina.transport.nio;
 
 import java.nio.channels.SelectableChannel;
 
-import org.apache.mina.api.IoSession;
-
 /**
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface SelectorLoop {
     /**
-     * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
-     * OP_READ or OP_WRITE.
-     * 
-     * @param session The session
-     * @param accept Registers for OP_ACCEPT events
-     * @param read Registers for OP_READ events
-     * @param write Registers for OP_WRITE events
-     * @param listener The listener
-     * @param channel
-     */
-    void register(IoSession session, boolean accept, boolean read, boolean write, SelectorListener listener,
-            SelectableChannel channel);
-
-    /**
-     * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
-     * OP_READ or OP_WRITE.
+     * Register a channel on a Selector, for some events. We can register for OP_ACCEPT, OP_READ or OP_WRITE.
      * 
      * @param accept Registers for OP_ACCEPT events
+     * @param connect Registers for OP_CONNECT events
      * @param read Registers for OP_READ events
      * @param write Registers for OP_WRITE events
      * @param listener The listener
      * @param channel
      */
-    void register(boolean accept, boolean read, boolean write, SelectorListener listener, SelectableChannel channel);
+    void register(boolean accept, boolean connect, boolean read, boolean write, SelectorListener listener,
+            SelectableChannel channel, RegistrationCallback callback);
 
     void modifyRegistration(boolean accept, boolean read, boolean write, SelectorListener listener,
             SelectableChannel channel);

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java Fri Dec 28 13:01:20 2012
@@ -28,10 +28,31 @@ import org.apache.mina.service.executor.
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractTcpClient extends AbstractIoClient {
+
+    /** the default session configuration */
+    private TcpSessionConfig config;
+
     /**
      * Create an new AbsractTcpClient instance
      */
     protected AbstractTcpClient(IoHandlerExecutor ioHandlerExecutor) {
         super(ioHandlerExecutor);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public TcpSessionConfig getSessionConfig() {
+        return this.config;
+    }
+
+    /**
+     * Set the default configuration for created TCP sessions
+     * 
+     * @param config
+     */
+    public void setSessionConfig(final TcpSessionConfig config) {
+        this.config = config;
+    }
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java Fri Dec 28 13:01:20 2012
@@ -19,8 +19,11 @@
  */
 package org.apache.mina.transport.udp;
 
+import java.net.SocketAddress;
+
 import javax.net.ssl.SSLException;
 
+import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoSession;
 import org.apache.mina.service.client.AbstractIoClient;
 import org.apache.mina.service.executor.IoHandlerExecutor;
@@ -44,4 +47,15 @@ public abstract class AbstractUdpClient 
     public void initSecured(IoSession session) throws SSLException {
         // Do nothing : UDP does not support SSL
     }
+
+    /**
+     * Connects to the specified remote address binding to the specified local address.
+     * 
+     * @param remoteAddress Remote {@link SocketAddress} to connect
+     * @param localAddress Local {@link SocketAddress} to use while initiating connection to remote
+     *        {@link SocketAddress}
+     * @return the {@link IoFuture} instance which is completed when the connection attempt initiated by this call
+     *         succeeds or fails.
+     */
+    public abstract IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress);
 }

Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java Fri Dec 28 13:01:20 2012
@@ -216,7 +216,7 @@ public class AbstractIoSessionTest {
     @Test
     public void chain_open() {
         final DummySession session = new DummySession(service);
-        session.processSessionOpened();
+        session.processSessionOpen();
         verify(filter1).sessionOpened(eq(session));
         verify(filter2).sessionOpened(eq(session));
         verify(filter3).sessionOpened(eq(session));

Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java Fri Dec 28 13:01:20 2012
@@ -49,7 +49,7 @@ public class NioTcpServerFilterEventTest
 
     private static final int CLIENT_COUNT = 100;
 
-    private static final int WAIT_TIME = 5000;
+    private static final int WAIT_TIME = 30000;
 
     private final CountDownLatch msgSentLatch = new CountDownLatch(CLIENT_COUNT);
 

Modified: mina/mina/trunk/core/src/test/resources/log4j2-test.xml
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/resources/log4j2-test.xml?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/resources/log4j2-test.xml (original)
+++ mina/mina/trunk/core/src/test/resources/log4j2-test.xml Fri Dec 28 13:01:20 2012
@@ -2,7 +2,7 @@
 <configuration>
   <appenders>
     <Console name="STDOUT" target="SYSTEM_OUT">
-      <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+      <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n %ex"/>
     </Console>
   </appenders>
   <loggers>
@@ -11,4 +11,4 @@
       <appender-ref ref="STDOUT"/>
     </root>
   </loggers>
-</configuration>
\ No newline at end of file
+</configuration>

Added: mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java?rev=1426490&view=auto
==============================================================================
--- mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java (added)
+++ mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java Fri Dec 28 13:01:20 2012
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+
+package org.apache.mina.examples.echoclient;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.mina.api.AbstractIoHandler;
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.examples.echoserver.NioEchoServer;
+import org.apache.mina.transport.nio.NioTcpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic client test
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ * 
+ */
+public class NioEchoClient {
+
+    static final private Logger LOG = LoggerFactory.getLogger(NioEchoServer.class);
+
+    public static void main(String[] args) {
+        LOG.info("starting echo client");
+
+        final NioTcpClient client = new NioTcpClient();
+        client.setFilters();
+        client.setIoHandler(new AbstractIoHandler() {
+            @Override
+            public void sessionOpened(final IoSession session) {
+                LOG.info("session opened {}", session);
+
+            }
+
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                LOG.info("message received {}", message);
+                if (message instanceof ByteBuffer) {
+                    LOG.info("echoing");
+                    session.write(message);
+                }
+            }
+
+            @Override
+            public void messageSent(IoSession session, Object message) {
+                LOG.info("message sent {}", message);
+            }
+
+            @Override
+            public void sessionClosed(IoSession session) {
+                LOG.info("session closed {}", session);
+            }
+        });
+
+        try {
+            IoFuture<IoSession> future = client.connect(new InetSocketAddress("localhost", 9999));
+
+            try {
+                IoSession session = future.get();
+                LOG.info("session connected : {}", session);
+            } catch (ExecutionException e) {
+                LOG.error("cannot connect : ", e);
+            }
+
+            LOG.debug("Running the client for 25 sec");
+            Thread.sleep(25000);
+        } catch (IOException e) {
+            LOG.error("connection error : {}", e);
+        } catch (InterruptedException e) {
+        }
+    }
+}