You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/12/28 14:01:21 UTC
svn commit: r1426490 - in /mina/mina/trunk:
core/src/main/java/org/apache/mina/api/
core/src/main/java/org/apache/mina/service/client/
core/src/main/java/org/apache/mina/session/
core/src/main/java/org/apache/mina/transport/nio/ core/src/main/java/org/...
Author: jvermillard
Date: Fri Dec 28 13:01:20 2012
New Revision: 1426490
URL: http://svn.apache.org/viewvc?rev=1426490&view=rev
Log:
TCP client (WIP)
Added:
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java
mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/
mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java
Removed:
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSession.java
Modified:
mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
mina/mina/trunk/core/src/test/resources/log4j2-test.xml
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoClient.java Fri Dec 28 13:01:20 2012
@@ -19,48 +19,37 @@
*/
package org.apache.mina.api;
+import java.io.IOException;
import java.net.SocketAddress;
/**
- * Connects to endpoint, communicates with the server, and fires events to
- * {@link org.apache.mina.service.IoHandler}s.
- *
+ * Connects to endpoint, communicates with the server, and fires events to {@link org.apache.mina.service.IoHandler}s.
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface IoClient extends IoService {
/**
- * Returns the connect timeout in milliseconds. The default value is 1
- * minute.
- *
+ * Returns the connect timeout in milliseconds. The default value is 1 minute.
+ *
* @return the connect timeout in milliseconds
*/
long getConnectTimeoutMillis();
/**
* Sets the connect timeout in milliseconds. The default value is 1 minute.
- *
+ *
* @param connectTimeoutInMillis Connection timeout in ms
*/
void setConnectTimeoutMillis(long connectTimeoutInMillis);
/**
* Connects to the specified remote address.
- *
+ *
* @param remoteAddress Remote {@link SocketAddress} to connect
- * @return the {@link IoFuture} instance which is completed when the
- * connection attempt initiated by this call succeeds or fails.
+ * @return the {@link IoFuture} instance which is completed when the connection attempt initiated by this call
+ * succeeds or fails.
+ * @throws IOException
*/
- IoFuture<IoSession> connect(SocketAddress remoteAddress);
+ IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException;
- /**
- * Connects to the specified remote address binding to the specified local
- * address.
- *
- * @param remoteAddress Remote {@link SocketAddress} to connect
- * @param localAddress Local {@link SocketAddress} to use while initiating connection to
- * remote {@link SocketAddress}
- * @return the {@link IoFuture} instance which is completed when the
- * connection attempt initiated by this call succeeds or fails.
- */
- IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress);
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Fri Dec 28 13:01:20 2012
@@ -19,11 +19,7 @@
*/
package org.apache.mina.service.client;
-import java.net.SocketAddress;
-
import org.apache.mina.api.IoClient;
-import org.apache.mina.api.IoFuture;
-import org.apache.mina.api.IoSession;
import org.apache.mina.service.AbstractIoService;
import org.apache.mina.service.executor.IoHandlerExecutor;
@@ -33,6 +29,9 @@ import org.apache.mina.service.executor.
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractIoClient extends AbstractIoService implements IoClient {
+
+ private long connectTimeoutInMillis = 10000;
+
/**
* Create an new AbstractIoClient instance
*/
@@ -42,21 +41,12 @@ public abstract class AbstractIoClient e
@Override
public long getConnectTimeoutMillis() {
- return 0;
+ return connectTimeoutInMillis;
}
@Override
public void setConnectTimeoutMillis(long connectTimeoutInMillis) {
- }
-
- @Override
- public IoFuture<IoSession> connect(SocketAddress remoteAddress) {
- return null;
- }
-
- @Override
- public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
- return null;
+ this.connectTimeoutInMillis = connectTimeoutInMillis;
}
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Fri Dec 28 13:01:20 2012
@@ -643,9 +643,9 @@ public abstract class AbstractIoSession
}
/**
- * process session opened event using the filter chain. To be called by the session {@link SelectorLoop} .
+ * process session open event using the filter chain. To be called by the session {@link SelectorLoop} .
*/
- public void processSessionOpened() {
+ public void processSessionOpen() {
LOG.debug("processing session open event");
try {
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Fri Dec 28 13:01:20 2012
@@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.mina.api.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,25 +100,20 @@ public class NioSelectorLoop implements
* {@inheritDoc}
*/
@Override
- public void register(boolean accept, boolean read, boolean write, SelectorListener listener,
- SelectableChannel channel) {
- register(null, accept, read, write, listener, channel);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void register(IoSession session, boolean accept, boolean read, boolean write, SelectorListener listener,
- SelectableChannel channel) {
- logger.debug("registering : {} for accept : {}, read : {}, write : {}", new Object[] { listener, accept, read,
- write });
+ public void register(boolean accept, boolean connect, boolean read, boolean write, SelectorListener listener,
+ SelectableChannel channel, RegistrationCallback callback) {
+ logger.debug("registering : {} for accept : {}, connect: {}, read : {}, write : {}, channel : {}",
+ new Object[] { listener, accept, connect, read, write, channel });
int ops = 0;
if (accept) {
ops |= SelectionKey.OP_ACCEPT;
}
+ if (connect) {
+ ops |= SelectionKey.OP_CONNECT;
+ }
+
if (read) {
ops |= SelectionKey.OP_READ;
}
@@ -129,7 +123,7 @@ public class NioSelectorLoop implements
}
// TODO : if it's the same selector/worker, we don't need to do that we could directly enqueue
- registrationQueue.add(new Registration(session, ops, channel, listener));
+ registrationQueue.add(new Registration(ops, channel, listener, callback));
// Now, wakeup the selector in order to let it update the selectionKey status
selector.wakeup();
@@ -141,8 +135,8 @@ public class NioSelectorLoop implements
@Override
public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
final SelectorListener listener, final SelectableChannel channel) {
- logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
- accept, read, write });
+ logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}, channel : {}", new Object[] {
+ listener, accept, read, write, channel });
final SelectionKey key = channel.keyFor(selector);
if (key == null) {
@@ -161,7 +155,9 @@ public class NioSelectorLoop implements
ops |= SelectionKey.OP_WRITE;
}
key.interestOps(ops);
- key.selector().wakeup();
+
+ // we need to wakeup for the registration to be modified (TODO : not needed if we are in the worker thread)
+ selector.wakeup();
}
/**
@@ -205,10 +201,12 @@ public class NioSelectorLoop implements
while (it.hasNext()) {
final SelectionKey key = it.next();
final SelectorListener listener = (SelectorListener) key.attachment();
- listener.ready(key.isAcceptable(), key.isReadable(), key.isReadable() ? readBuffer : null,
- key.isWritable());
+ logger.debug("key : {}", key);
+ listener.ready(key.isAcceptable(), key.isConnectable(), key.isReadable(),
+ key.isReadable() ? readBuffer : null, key.isWritable());
// if you don't remove the event of the set, the selector will present you this event again and
// again
+ logger.debug("remove");
it.remove();
}
@@ -219,10 +217,8 @@ public class NioSelectorLoop implements
try {
SelectionKey selectionKey = reg.channel.register(selector, reg.ops, reg.listener);
- IoSession session = reg.getSession();
-
- if (session != null) {
- ((NioSession) session).setSelectionKey(selectionKey);
+ if (reg.getCallback() != null) {
+ reg.getCallback().done(selectionKey);
}
} catch (final ClosedChannelException ex) {
// dead session..
@@ -236,13 +232,18 @@ public class NioSelectorLoop implements
}
}
- private class Registration {
+ @Override
+ public void wakeup() {
+ selector.wakeup();
+ }
+
+ private static class Registration {
- public Registration(IoSession session, int ops, SelectableChannel channel, SelectorListener listener) {
+ public Registration(int ops, SelectableChannel channel, SelectorListener listener, RegistrationCallback callback) {
this.ops = ops;
this.channel = channel;
this.listener = listener;
- this.session = session;
+ this.callback = callback;
}
private final int ops;
@@ -251,15 +252,10 @@ public class NioSelectorLoop implements
private final SelectorListener listener;
- private final IoSession session;
+ private final RegistrationCallback callback;
- public IoSession getSession() {
- return session;
+ public RegistrationCallback getCallback() {
+ return callback;
}
}
-
- @Override
- public void wakeup() {
- selector.wakeup();
- }
}
\ No newline at end of file
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java Fri Dec 28 13:01:20 2012
@@ -19,26 +19,126 @@
*/
package org.apache.mina.transport.nio;
-import org.apache.mina.api.IoSessionConfig;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.InOrderHandlerExecutor;
import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.tcp.AbstractTcpClient;
+import org.apache.mina.util.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * TODO
+ * This class implements a TCP NIO based client.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class NioTcpClient extends AbstractTcpClient {
+
+ /** A logger for this class */
+ static final Logger LOG = LoggerFactory.getLogger(NioTcpClient.class);
+
+ // the SelectorLoop for connecting the sessions
+ private final SelectorLoop connectSelectorLoop;
+
+ // the Selectorloop for handling read/write session events
+ private final SelectorLoopPool readWriteSelectorPool;
+
+ // for detecting idle session
+ private IdleChecker idleChecker;
+
+ /**
+ * Create a TCP client with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link InOrderHandlerExecutor})
+ */
+ public NioTcpClient() {
+ this(new NioSelectorLoop("connect", 0), new FixedSelectorLoopPool(
+ Runtime.getRuntime().availableProcessors() + 1), null);
+ }
+
+ /**
+ * Create a TCP client with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+ * the OP_CONNECT events. If the pool contains only one SelectorLoop, then all the events will be managed by the
+ * same Selector.
+ *
+ * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpClient(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
+ this.connectSelectorLoop = selectorLoopPool.getSelectorLoop();
+ this.readWriteSelectorPool = selectorLoopPool;
+ }
+
/**
- * Create a new instance of NioTcpClient
+ * Create a TCP client with provided selector loops pool
+ *
+ * @param connectSelectorLoop the selector loop for handling connection events (connection of new session)
+ * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
*/
- public NioTcpClient(IoHandlerExecutor ioHandlerExecutor) {
- super(ioHandlerExecutor);
+ public NioTcpClient(SelectorLoop connectSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+ IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
+ this.connectSelectorLoop = connectSelectorLoop;
+ this.readWriteSelectorPool = readWriteSelectorLoop;
+ idleChecker = new IndexedIdleChecker();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public IoSessionConfig getSessionConfig() {
- // TODO Auto-generated method stub
- return null;
+ public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+ Assert.assertNotNull(remoteAddress, "remoteAddress");
+
+ SocketChannel clientSocket = SocketChannel.open();
+
+ // non blocking
+ clientSocket.configureBlocking(false);
+
+ // connect to a running server
+ boolean connected = clientSocket.connect(remoteAddress);
+ final NioTcpSession session = new NioTcpSession(this, clientSocket, readWriteSelectorPool.getSelectorLoop(),
+ idleChecker);
+
+ NioTcpSession.ConnectFuture connectFuture = new NioTcpSession.ConnectFuture();
+ session.setConnectFuture(connectFuture);
+
+ if (!connected) {
+ // async connection, let's the connection complete in background, the selector loop will dectect whe nthe
+ // connection is successful
+ connectSelectorLoop.register(false, true, false, false, session, clientSocket, new RegistrationCallback() {
+
+ @Override
+ public void done(SelectionKey selectionKey) {
+ session.setSelectionKey(selectionKey);
+ }
+ });
+ } else {
+ // already connected (probably a loopback connection)
+ // register for read
+ connectSelectorLoop.register(false, false, true, false, session, clientSocket, new RegistrationCallback() {
+
+ @Override
+ public void done(SelectionKey selectionKey) {
+ session.setSelectionKey(selectionKey);
+ }
+ });
+ session.setConnected();
+ }
+ return connectFuture;
}
+
}
\ No newline at end of file
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Fri Dec 28 13:01:20 2012
@@ -34,6 +34,7 @@ import org.apache.mina.service.idlecheck
import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.tcp.AbstractTcpServer;
import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +68,7 @@ public class NioTcpServer extends Abstra
*/
public NioTcpServer() {
this(new NioSelectorLoop("accept", 0),
- new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), new InOrderHandlerExecutor(
- Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().availableProcessors() + 1));
+ new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), null);
}
/**
@@ -76,8 +76,7 @@ public class NioTcpServer extends Abstra
* the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
* Selector.
*
- * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
- * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
* @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
* one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
* operations.
@@ -130,10 +129,7 @@ public class NioTcpServer extends Abstra
*/
@Override
public synchronized void bind(final SocketAddress localAddress) throws IOException {
- if (localAddress == null) {
- // We should at least have one address to bind on
- throw new IllegalArgumentException("LocalAdress cannot be null");
- }
+ Assert.assertNotNull(localAddress, "localAddress");
// check if the address is already bound
if (this.address != null) {
@@ -148,7 +144,7 @@ public class NioTcpServer extends Abstra
serverChannel.socket().bind(address);
serverChannel.configureBlocking(false);
- acceptSelectorLoop.register(true, false, false, this, serverChannel);
+ acceptSelectorLoop.register(true, false, false, false, this, serverChannel, null);
idleChecker = new IndexedIdleChecker();
idleChecker.start();
@@ -203,7 +199,8 @@ public class NioTcpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
if (accept) {
LOG.debug("acceptable new client");
@@ -291,9 +288,13 @@ public class NioTcpServer extends Abstra
}
// add the session to the queue for being added to the selector
- readWriteSelectorLoop.register(session, false, true, false, session, socketChannel);
+ readWriteSelectorLoop.register(false, false, true, false, session, socketChannel, new RegistrationCallback() {
- session.processSessionOpened();
+ @Override
+ public void done(SelectionKey selectionKey) {
+ session.setSelectionKey(selectionKey);
+ }
+ });
session.setConnected();
idleChecker.sessionRead(session, System.currentTimeMillis());
idleChecker.sessionWritten(session, System.currentTimeMillis());
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Fri Dec 28 13:01:20 2012
@@ -24,11 +24,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import org.apache.mina.api.IoService;
+import org.apache.mina.api.IoSession;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.session.AbstractIoSession;
import org.apache.mina.session.DefaultWriteFuture;
@@ -37,6 +37,7 @@ import org.apache.mina.session.SslHelper
import org.apache.mina.session.WriteRequest;
import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
-public class NioTcpSession extends AbstractIoSession implements NioSession, SelectorListener {
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
@@ -60,6 +61,11 @@ public class NioTcpSession extends Abstr
/** the socket configuration */
private final TcpSessionConfig configuration;
+ /** the future representing this session connection operation (client only) */
+ private ConnectFuture connectFuture;
+
+ private SelectionKey selectionKey;
+
NioTcpSession(final IoService service, final SocketChannel channel, final SelectorLoop selectorLoop,
final IdleChecker idleChecker) {
super(service, idleChecker);
@@ -68,6 +74,10 @@ public class NioTcpSession extends Abstr
this.configuration = new ProxyTcpSessionConfig(channel.socket());
}
+ void setConnectFuture(ConnectFuture connectFuture) {
+ this.connectFuture = connectFuture;
+ }
+
/**
* Get the underlying {@link SocketChannel} of this session
*
@@ -183,6 +193,12 @@ public class NioTcpSession extends Abstr
}
state = SessionState.CONNECTED;
+
+ if (connectFuture != null) {
+ connectFuture.complete(this);
+ connectFuture = null; // free some memory
+ }
+ processSessionOpen();
}
/**
@@ -359,7 +375,32 @@ public class NioTcpSession extends Abstr
* {@inheritDoc}
*/
@Override
- public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
+ LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept,
+ connect, read, write });
+ if (connect) {
+ try {
+
+ boolean isConnected = channel.finishConnect();
+ if (!isConnected) {
+ LOG.error("unable to connect session {}", this);
+ } else {
+ // cancel current registration for connection
+ selectionKey.cancel();
+ selectionKey = null;
+ // register for reading
+ selectorLoop.register(false, false, true, false, this, channel, null);
+ setConnected();
+ }
+ } catch (IOException e) {
+ LOG.debug("Connection error, we cancel the future", e);
+ if (connectFuture != null) {
+ connectFuture.error(e);
+ }
+ }
+ }
+
if (read) {
processRead(readBuffer);
}
@@ -372,37 +413,29 @@ public class NioTcpSession extends Abstr
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public SelectionKey getSelectionKey() {
- return null;
+ void setSelectionKey(SelectionKey key) {
+ this.selectionKey = key;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void setSelectionKey(SelectionKey key) {
- // TODO Auto-generated method stub
-
- }
+ static class ConnectFuture extends AbstractIoFuture<IoSession> {
- /**
- * {@inheritDoc}
- */
- @Override
- public void setSelector(Selector selector) {
- // TODO Auto-generated method stub
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ return false;
+ }
- }
+ /**
+ * session connected
+ */
+ public void complete(IoSession session) {
+ setResult(session);
+ }
- /**
- * {@inheritDoc}
- */
- @Override
- public void wakeup() {
- selectorLoop.wakeup();
+ /**
+ * connection error
+ */
+ public void error(Exception e) {
+ setException(e);
+ }
}
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java Fri Dec 28 13:01:20 2012
@@ -19,6 +19,11 @@
*/
package org.apache.mina.transport.nio;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.transport.udp.AbstractUdpClient;
@@ -41,4 +46,16 @@ public class NioUdpClient extends Abstra
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Fri Dec 28 13:01:20 2012
@@ -129,7 +129,7 @@ public class NioUdpServer extends Abstra
datagramChannel.socket().bind(address);
datagramChannel.configureBlocking(false);
- selectorLoop.register(false, true, false, this, datagramChannel);
+ selectorLoop.register(false, false, true, false, this, datagramChannel, null);
// it's the first address bound, let's fire the event
this.fireServiceActivated();
@@ -171,7 +171,8 @@ public class NioUdpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
if (read) {
try {
LOG.debug("readable datagram for UDP service : {}", this);
Added: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java?rev=1426490&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/RegistrationCallback.java Fri Dec 28 13:01:20 2012
@@ -0,0 +1,8 @@
+package org.apache.mina.transport.nio;
+
+import java.nio.channels.SelectionKey;
+
+public interface RegistrationCallback {
+
+ void done(SelectionKey selectionKey);
+}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java Fri Dec 28 13:01:20 2012
@@ -22,10 +22,12 @@ package org.apache.mina.transport.nio;
import java.nio.ByteBuffer;
/**
+ * Listen for selector events.
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface SelectorListener {
- void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write);
+ void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write);
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Fri Dec 28 13:01:20 2012
@@ -20,37 +20,22 @@ package org.apache.mina.transport.nio;
import java.nio.channels.SelectableChannel;
-import org.apache.mina.api.IoSession;
-
/**
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface SelectorLoop {
/**
- * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
- * OP_READ or OP_WRITE.
- *
- * @param session The session
- * @param accept Registers for OP_ACCEPT events
- * @param read Registers for OP_READ events
- * @param write Registers for OP_WRITE events
- * @param listener The listener
- * @param channel
- */
- void register(IoSession session, boolean accept, boolean read, boolean write, SelectorListener listener,
- SelectableChannel channel);
-
- /**
- * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
- * OP_READ or OP_WRITE.
+ * Register a channel on a Selector, for some events. We can register for OP_ACCEPT, OP_READ or OP_WRITE.
*
* @param accept Registers for OP_ACCEPT events
+ * @param connect Registers for OP_CONNECT events
* @param read Registers for OP_READ events
* @param write Registers for OP_WRITE events
* @param listener The listener
* @param channel
*/
- void register(boolean accept, boolean read, boolean write, SelectorListener listener, SelectableChannel channel);
+ void register(boolean accept, boolean connect, boolean read, boolean write, SelectorListener listener,
+ SelectableChannel channel, RegistrationCallback callback);
void modifyRegistration(boolean accept, boolean read, boolean write, SelectorListener listener,
SelectableChannel channel);
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java Fri Dec 28 13:01:20 2012
@@ -28,10 +28,31 @@ import org.apache.mina.service.executor.
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractTcpClient extends AbstractIoClient {
+
+ /** the default session configuration */
+ private TcpSessionConfig config;
+
/**
* Create an new AbsractTcpClient instance
*/
protected AbstractTcpClient(IoHandlerExecutor ioHandlerExecutor) {
super(ioHandlerExecutor);
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TcpSessionConfig getSessionConfig() {
+ return this.config;
+ }
+
+ /**
+ * Set the default configuration for created TCP sessions
+ *
+ * @param config
+ */
+ public void setSessionConfig(final TcpSessionConfig config) {
+ this.config = config;
+ }
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java Fri Dec 28 13:01:20 2012
@@ -19,8 +19,11 @@
*/
package org.apache.mina.transport.udp;
+import java.net.SocketAddress;
+
import javax.net.ssl.SSLException;
+import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoSession;
import org.apache.mina.service.client.AbstractIoClient;
import org.apache.mina.service.executor.IoHandlerExecutor;
@@ -44,4 +47,15 @@ public abstract class AbstractUdpClient
public void initSecured(IoSession session) throws SSLException {
// Do nothing : UDP does not support SSL
}
+
+ /**
+ * Connects to the specified remote address binding to the specified local address.
+ *
+ * @param remoteAddress Remote {@link SocketAddress} to connect
+ * @param localAddress Local {@link SocketAddress} to use while initiating connection to remote
+ * {@link SocketAddress}
+ * @return the {@link IoFuture} instance which is completed when the connection attempt initiated by this call
+ * succeeds or fails.
+ */
+ public abstract IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress);
}
Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java Fri Dec 28 13:01:20 2012
@@ -216,7 +216,7 @@ public class AbstractIoSessionTest {
@Test
public void chain_open() {
final DummySession session = new DummySession(service);
- session.processSessionOpened();
+ session.processSessionOpen();
verify(filter1).sessionOpened(eq(session));
verify(filter2).sessionOpened(eq(session));
verify(filter3).sessionOpened(eq(session));
Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java Fri Dec 28 13:01:20 2012
@@ -49,7 +49,7 @@ public class NioTcpServerFilterEventTest
private static final int CLIENT_COUNT = 100;
- private static final int WAIT_TIME = 5000;
+ private static final int WAIT_TIME = 30000;
private final CountDownLatch msgSentLatch = new CountDownLatch(CLIENT_COUNT);
Modified: mina/mina/trunk/core/src/test/resources/log4j2-test.xml
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/resources/log4j2-test.xml?rev=1426490&r1=1426489&r2=1426490&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/resources/log4j2-test.xml (original)
+++ mina/mina/trunk/core/src/test/resources/log4j2-test.xml Fri Dec 28 13:01:20 2012
@@ -2,7 +2,7 @@
<configuration>
<appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
- <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n %ex"/>
</Console>
</appenders>
<loggers>
@@ -11,4 +11,4 @@
<appender-ref ref="STDOUT"/>
</root>
</loggers>
-</configuration>
\ No newline at end of file
+</configuration>
Added: mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java?rev=1426490&view=auto
==============================================================================
--- mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java (added)
+++ mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoclient/NioEchoClient.java Fri Dec 28 13:01:20 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.mina.examples.echoclient;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.mina.api.AbstractIoHandler;
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.examples.echoserver.NioEchoServer;
+import org.apache.mina.transport.nio.NioTcpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic client test
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ *
+ */
+public class NioEchoClient {
+
+ static final private Logger LOG = LoggerFactory.getLogger(NioEchoServer.class);
+
+ public static void main(String[] args) {
+ LOG.info("starting echo client");
+
+ final NioTcpClient client = new NioTcpClient();
+ client.setFilters();
+ client.setIoHandler(new AbstractIoHandler() {
+ @Override
+ public void sessionOpened(final IoSession session) {
+ LOG.info("session opened {}", session);
+
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ LOG.info("message received {}", message);
+ if (message instanceof ByteBuffer) {
+ LOG.info("echoing");
+ session.write(message);
+ }
+ }
+
+ @Override
+ public void messageSent(IoSession session, Object message) {
+ LOG.info("message sent {}", message);
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) {
+ LOG.info("session closed {}", session);
+ }
+ });
+
+ try {
+ IoFuture<IoSession> future = client.connect(new InetSocketAddress("localhost", 9999));
+
+ try {
+ IoSession session = future.get();
+ LOG.info("session connected : {}", session);
+ } catch (ExecutionException e) {
+ LOG.error("cannot connect : ", e);
+ }
+
+ LOG.debug("Running the client for 25 sec");
+ Thread.sleep(25000);
+ } catch (IOException e) {
+ LOG.error("connection error : {}", e);
+ } catch (InterruptedException e) {
+ }
+ }
+}