You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2013/03/25 00:32:27 UTC

[2/7] o Moved the processWrite method into the AbstraIoSession class

http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
new file mode 100644
index 0000000..fdffd7a
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
@@ -0,0 +1,351 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.nio.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OrderedHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
+import org.apache.mina.transport.nio.FixedSelectorLoopPool;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.RegistrationCallback;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.nio.SelectorLoopPool;
+import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a TCP NIO based server.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
+    /** A logger for this class */
+    static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
+
+    /** the bound local address */
+    private SocketAddress address = null;
+
+    private final SelectorLoop acceptSelectorLoop;
+
+    private final SelectorLoopPool readWriteSelectorPool;
+
+    // the key used for selecting accept event
+    private SelectionKey acceptKey = null;
+
+    // the server socket for accepting clients
+    private ServerSocketChannel serverChannel = null;
+
+    private IdleChecker idleChecker;
+
+    /**
+     * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link OrderedHandlerExecutor})
+     */
+    public NioTcpServer() {
+        this(new NioSelectorLoop("accept", 0), new FixedSelectorLoopPool("Server", Runtime.getRuntime()
+                .availableProcessors() + 1), null);
+    }
+
+    /**
+     * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link OrderedHandlerExecutor})
+     * 
+     * @param config The specific configuration to use
+     */
+    public NioTcpServer(TcpSessionConfig config) {
+        this(config, new NioSelectorLoop("accept", 0), new FixedSelectorLoopPool("Server", Runtime.getRuntime()
+                .availableProcessors() + 1), null);
+    }
+
+    /**
+     * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+     * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+     * Selector.
+     * 
+     * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpServer(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+        this(selectorLoopPool.getSelectorLoop(), selectorLoopPool, handlerExecutor);
+    }
+
+    /**
+     * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+     * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+     * Selector.
+     * 
+     * @param config The specific configuration to use
+     * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpServer(TcpSessionConfig config, SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+        this(config, selectorLoopPool.getSelectorLoop(), selectorLoopPool, handlerExecutor);
+    }
+
+    /**
+     * Create a TCP server with provided selector loops pool
+     * 
+     * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+            IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
+        this.acceptSelectorLoop = acceptSelectorLoop;
+        this.readWriteSelectorPool = readWriteSelectorLoop;
+    }
+
+    /**
+     * Create a TCP server with provided selector loops pool
+     * 
+     * @param config The specific configuration to use
+     * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpServer(TcpSessionConfig config, SelectorLoop acceptSelectorLoop,
+            SelectorLoopPool readWriteSelectorLoop, IoHandlerExecutor handlerExecutor) {
+        super(config, handlerExecutor);
+        this.acceptSelectorLoop = acceptSelectorLoop;
+        this.readWriteSelectorPool = readWriteSelectorLoop;
+    }
+
+    /**
+     * Get the inner Server socket for accepting new client connections
+     * 
+     * @return
+     */
+    public ServerSocketChannel getServerSocketChannel() {
+        return this.serverChannel;
+    }
+
+    public void setServerSocketChannel(final ServerSocketChannel serverChannel) {
+        this.serverChannel = serverChannel;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void bind(final int port) throws IOException {
+        bind(new InetSocketAddress(port));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void bind(final SocketAddress localAddress) throws IOException {
+        Assert.assertNotNull(localAddress, "localAddress");
+
+        // check if the address is already bound
+        if (this.address != null) {
+            throw new IOException("address " + address + " already bound");
+        }
+
+        LOG.info("binding address {}", localAddress);
+        this.address = localAddress;
+
+        serverChannel = ServerSocketChannel.open();
+        serverChannel.socket().setReuseAddress(isReuseAddress());
+        serverChannel.socket().bind(address);
+        serverChannel.configureBlocking(false);
+
+        acceptSelectorLoop.register(true, false, false, false, this, serverChannel, null);
+
+        idleChecker = new IndexedIdleChecker();
+        idleChecker.start();
+
+        // it's the first address bound, let's fire the event
+        this.fireServiceActivated();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SocketAddress getBoundAddress() {
+        return address;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void unbind() throws IOException {
+        LOG.info("unbinding {}", address);
+        if (this.address == null) {
+            throw new IllegalStateException("server not bound");
+        }
+        serverChannel.socket().close();
+        serverChannel.close();
+        acceptSelectorLoop.unregister(this, serverChannel);
+
+        this.address = null;
+        this.fireServiceInactivated();
+
+        // will stop the acceptor processor if we are the last service
+        idleChecker.destroy();
+    }
+
+    /**
+     * @return the acceptKey
+     */
+    public SelectionKey getAcceptKey() {
+        return acceptKey;
+    }
+
+    /**
+     * @param acceptKey the acceptKey to set
+     */
+    public void setAcceptKey(final SelectionKey acceptKey) {
+        this.acceptKey = acceptKey;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
+        if (accept) {
+            LOG.debug("acceptable new client");
+
+            // accepted connection
+            try {
+                LOG.debug("new client accepted");
+                createSession(getServerSocketChannel().accept());
+
+            } catch (final IOException e) {
+                LOG.error("error while accepting new client", e);
+            }
+        }
+
+        if (read || write) {
+            throw new IllegalStateException("should not receive read or write events");
+        }
+    }
+
+    private void createSession(SocketChannel clientSocket) throws IOException {
+        LOG.debug("create session");
+        SocketChannel socketChannel = clientSocket;
+        TcpSessionConfig config = getSessionConfig();
+        SelectorLoop readWriteSelectorLoop = readWriteSelectorPool.getSelectorLoop();
+        final NioTcpSession session = new NioTcpSession(this, socketChannel, readWriteSelectorLoop, idleChecker);
+
+        socketChannel.configureBlocking(false);
+
+        // apply idle configuration
+        session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+        session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+                config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+        // apply the default service socket configuration
+        Boolean keepAlive = config.isKeepAlive();
+
+        if (keepAlive != null) {
+            session.getConfig().setKeepAlive(keepAlive);
+        }
+
+        Boolean oobInline = config.isOobInline();
+
+        if (oobInline != null) {
+            session.getConfig().setOobInline(oobInline);
+        }
+
+        Boolean reuseAddress = config.isReuseAddress();
+
+        if (reuseAddress != null) {
+            session.getConfig().setReuseAddress(reuseAddress);
+        }
+
+        Boolean tcpNoDelay = config.isTcpNoDelay();
+
+        if (tcpNoDelay != null) {
+            session.getConfig().setTcpNoDelay(tcpNoDelay);
+        }
+
+        Integer receiveBufferSize = config.getReadBufferSize();
+
+        if (receiveBufferSize != null) {
+            session.getConfig().setReadBufferSize(receiveBufferSize);
+        }
+
+        Integer sendBufferSize = config.getSendBufferSize();
+
+        if (sendBufferSize != null) {
+            session.getConfig().setSendBufferSize(sendBufferSize);
+        }
+
+        Integer trafficClass = config.getTrafficClass();
+
+        if (trafficClass != null) {
+            session.getConfig().setTrafficClass(trafficClass);
+        }
+
+        Integer soLinger = config.getSoLinger();
+
+        if (soLinger != null) {
+            session.getConfig().setSoLinger(soLinger);
+        }
+
+        // Set the secured flag if the service is to be used over SSL/TLS
+        if (config.isSecured()) {
+            session.initSecure(config.getSslContext());
+        }
+
+        // add the session to the queue for being added to the selector
+        readWriteSelectorLoop.register(false, false, true, false, session, socketChannel, new RegistrationCallback() {
+
+            @Override
+            public void done(SelectionKey selectionKey) {
+                session.setSelectionKey(selectionKey);
+                session.setConnected();
+            }
+        });
+
+        idleChecker.sessionRead(session, System.currentTimeMillis());
+        idleChecker.sessionWritten(session, System.currentTimeMillis());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
new file mode 100644
index 0000000..f06fda7
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
@@ -0,0 +1,409 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.nio.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IoService;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.session.SslHelper;
+import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.nio.RegistrationCallback;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A NIO based TCP session, should be used by {@link NioTcpServer} and {@link NioTcpClient}. A TCP session is a
+ * connection between a our server/client and the remote end-point.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ * 
+ */
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
+
+    /** the selector loop in charge of generating read/write events for this session */
+    private final SelectorLoop selectorLoop;
+
+    /** the socket configuration */
+    private final TcpSessionConfig configuration;
+
+    /** the future representing this session connection operation (client only) */
+    private ConnectFuture connectFuture;
+
+    /** The associated selectionKey */
+    private SelectionKey selectionKey;
+
+    /** The Direct Buffer used to send data */
+    private ByteBuffer sendBuffer;
+
+    /** The size of the buffer configured in the socket to send data */
+    private int sendBufferSize;
+
+    /* No qualifier*/NioTcpSession(final IoService service, final SocketChannel channel,
+            final SelectorLoop selectorLoop, final IdleChecker idleChecker) {
+        super(service, channel, idleChecker);
+        this.selectorLoop = selectorLoop;
+        this.configuration = new ProxyTcpSessionConfig(channel.socket());
+        sendBufferSize = configuration.getSendBufferSize();
+        sendBuffer = ByteBuffer.allocateDirect(sendBufferSize);
+    }
+
+    void setConnectFuture(ConnectFuture connectFuture) {
+        this.connectFuture = connectFuture;
+    }
+
+    /**
+     * Get the underlying {@link SocketChannel} of this session
+     * 
+     * @return the socket channel used by this session
+     */
+    SocketChannel getSocketChannel() {
+        return (SocketChannel) channel;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        if (channel == null) {
+            return null;
+        }
+        final Socket socket = ((SocketChannel) channel).socket();
+
+        if (socket == null) {
+            return null;
+        }
+
+        return (InetSocketAddress) socket.getRemoteSocketAddress();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        final Socket socket = ((SocketChannel) channel).socket();
+
+        if (socket == null) {
+            return null;
+        }
+
+        return (InetSocketAddress) socket.getLocalSocketAddress();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void suspendRead() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void suspendWrite() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected int writeDirect(Object message) {
+        try {
+            // Check that we can write into the channel
+            if (!isRegisteredForWrite()) {
+                // We don't have pending writes
+                return ((SocketChannel) channel).write((ByteBuffer) message);
+            } else {
+                return -1;
+            }
+        } catch (final IOException e) {
+            LOG.error("Exception while reading : ", e);
+            processException(e);
+
+            return -1;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew) {
+        ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
+        if (!message.isDirect()) {
+            int remaining = message.remaining();
+
+            if ((remaining > sendBufferSize) || createNew) {
+                ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining);
+                directBuffer.put(message);
+                directBuffer.flip();
+                writeRequest.setMessage(directBuffer);
+
+                return directBuffer;
+            } else {
+                sendBuffer.clear();
+                sendBuffer.put(message);
+                sendBuffer.flip();
+                writeRequest.setMessage(sendBuffer);
+
+                return sendBuffer;
+            }
+        }
+
+        return message;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void resumeRead() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void resumeWrite() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isReadSuspended() {
+        // TODO
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isWriteSuspended() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public TcpSessionConfig getConfig() {
+        return configuration;
+    }
+
+    /**
+     * Set this session status as connected. To be called by the processor selecting/polling this session.
+     */
+    void setConnected() {
+        if (!isCreated()) {
+            throw new RuntimeException("Trying to open a non created session");
+        }
+
+        state = SessionState.CONNECTED;
+
+        if (connectFuture != null) {
+            connectFuture.complete(this);
+            connectFuture = null; // free some memory
+        }
+
+        processSessionOpen();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void channelClose() {
+        try {
+            selectorLoop.unregister(this, channel);
+            channel.close();
+        } catch (final IOException e) {
+            LOG.error("Exception while closing the channel : ", e);
+            processException(e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flushWriteQueue() {
+        // register for write
+        selectorLoop.modifyRegistration(false, !isReadSuspended(), true, this, channel, true);
+    }
+
+    /**
+     * Process a read operation : read the data from the channel and push them to the chain.
+     * 
+     * @param readBuffer The buffer that will contain the read data
+     */
+    private void processRead(final ByteBuffer readBuffer) {
+        try {
+            LOG.debug("readable session : {}", this);
+
+            // First reset the buffer from what it contained before
+            readBuffer.clear();
+
+            // Read everything we can up to the buffer size
+            final int readCount = ((SocketChannel) channel).read(readBuffer);
+
+            LOG.debug("read {} bytes", readCount);
+
+            if (readCount < 0) {
+                // session closed by the remote peer
+                LOG.debug("session closed by the remote peer");
+                close(true);
+            } else if (readCount > 0) {
+                // we have read some data
+                // limit at the current position & rewind buffer back to start &
+                // push to the chain
+                readBuffer.flip();
+
+                if (isSecured()) {
+                    // We are reading data over a SSL/TLS encrypted connection.
+                    // Redirect the processing to the SslHelper class.
+                    final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+
+                    if (sslHelper == null) {
+                        throw new IllegalStateException();
+                    }
+
+                    sslHelper.processRead(this, readBuffer);
+                } else {
+                    // Plain message, not encrypted : go directly to the chain
+                    processMessageReceived(readBuffer);
+                }
+
+                // Update the session idle status
+                idleChecker.sessionRead(this, System.currentTimeMillis());
+            }
+        } catch (final IOException e) {
+            LOG.error("Exception while reading : ", e);
+            processException(e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
+        LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept,
+                connect, read, write });
+        if (connect) {
+            try {
+
+                boolean isConnected = ((SocketChannel) channel).finishConnect();
+
+                if (!isConnected) {
+                    LOG.error("unable to connect session {}", this);
+                } else {
+                    // cancel current registration for connection
+                    selectionKey.cancel();
+                    selectionKey = null;
+
+                    // Register for reading
+                    selectorLoop.register(false, false, true, false, this, channel, new RegistrationCallback() {
+
+                        @Override
+                        public void done(SelectionKey selectionKey) {
+                            setConnected();
+                        }
+                    });
+                }
+            } catch (IOException e) {
+                LOG.debug("Connection error, we cancel the future", e);
+                if (connectFuture != null) {
+                    connectFuture.error(e);
+                }
+            }
+        }
+
+        if (read) {
+            processRead(readBuffer);
+        }
+
+        if (write) {
+            processWrite(selectorLoop);
+        }
+        if (accept) {
+            throw new IllegalStateException("accept event should never occur on NioTcpSession");
+        }
+    }
+
+    void setSelectionKey(SelectionKey key) {
+        this.selectionKey = key;
+    }
+
+    static class ConnectFuture extends AbstractIoFuture<IoSession> {
+
+        @Override
+        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        /**
+         * session connected
+         */
+        public void complete(IoSession session) {
+            setResult(session);
+        }
+
+        /**
+         * connection error
+         */
+        public void error(Exception e) {
+            setException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
new file mode 100644
index 0000000..8ccfd98
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.udp.AbstractUdpClient;
+
+/**
+ * TODO
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpClient extends AbstractUdpClient {
+    /** the SelectorLoop for connecting the sessions */
+    // This is final, so that we know if it's not initialized
+    private final SelectorLoop connectSelectorLoop;
+
+    /**
+     * Create a new instance of NioUdpClient
+     */
+    public NioUdpClient() {
+        this(null);
+    }
+
+    /**
+     * Create a new instance of NioUdpClient
+     */
+    public NioUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
+        connectSelectorLoop = new NioSelectorLoop("connect", 0);
+    }
+
+    @Override
+    public IoSessionConfig getSessionConfig() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+        DatagramChannel ch = DatagramChannel.open();
+
+        if (remoteAddress != null) {
+            ch.socket().bind(remoteAddress);
+            ch.connect(remoteAddress);
+        }
+
+        return null;
+    }
+
+    @Override
+    public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
new file mode 100644
index 0000000..ebc6a7c
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
@@ -0,0 +1,297 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OrderedHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.udp.AbstractUdpServer;
+import org.apache.mina.transport.udp.UdpSessionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a UDP NIO based server.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpServer extends AbstractUdpServer implements SelectorListener {
+
+    static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class);
+
+    // the bound local address
+    private SocketAddress address = null;
+
+    // used for detecting idle sessions
+    private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+    // the inner channel for read/write UDP datagrams
+    private DatagramChannel datagramChannel = null;
+
+    // the key used for selecting read event
+    private SelectionKey readKey = null;
+
+    // list of all the sessions by remote socket address
+    private final Map<SocketAddress /* remote socket address */, NioUdpSession> sessions = new ConcurrentHashMap<SocketAddress, NioUdpSession>();
+
+    /** The selector loop used to incoming data */
+    private final SelectorLoop readSelectorLoop;
+
+    /**
+     * Create an UDP server with a new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link OrderedHandlerExecutor})
+     */
+    public NioUdpServer() {
+        this(new NioSelectorLoop("accept", 0), null);
+    }
+
+    /**
+     * Create an UDP server with a new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link OrderedHandlerExecutor})
+     * 
+     * @param sessionConfig The configuration to use for this server
+     */
+    public NioUdpServer(UdpSessionConfig config) {
+        this(config, new NioSelectorLoop("accept", 0), null);
+    }
+
+    /**
+     * Create an UDP server with provided selector loops pool
+     * 
+     * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+     * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioUdpServer(SelectorLoop readSelectorLoop, IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
+        this.readSelectorLoop = readSelectorLoop;
+    }
+
+    /**
+     * Create an UDP server with provided selector loops pool
+     * 
+     * @param sessionConfig The configuration to use for this server
+     * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioUdpServer(UdpSessionConfig config, SelectorLoop readSelectorLoop, IoHandlerExecutor handlerExecutor) {
+        super(config, handlerExecutor);
+        this.readSelectorLoop = readSelectorLoop;
+    }
+
+    /**
+     * Get the inner datagram channel for read and write operations. To be called by the {@link NioSelectorProcessor}
+     * 
+     * @return the datagram channel bound to this {@link NioUdpServer}.
+     */
+    public DatagramChannel getDatagramChannel() {
+        return datagramChannel;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SocketAddress getBoundAddress() {
+        return address;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void bind(final int port) throws IOException {
+        bind(new InetSocketAddress(port));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void bind(final SocketAddress localAddress) throws IOException {
+        if (localAddress == null) {
+            // We should at least have one address to bind on
+            throw new IllegalArgumentException("LocalAdress cannot be null");
+        }
+
+        // check if the address is already bound
+        if (this.address != null) {
+            throw new IOException("address " + address + " already bound");
+        }
+        address = localAddress;
+
+        LOG.info("binding address {}", localAddress);
+
+        datagramChannel = DatagramChannel.open();
+
+        datagramChannel.socket().setReuseAddress(isReuseAddress());
+        datagramChannel.socket().bind(address);
+        datagramChannel.configureBlocking(false);
+
+        readSelectorLoop.register(false, false, true, false, this, datagramChannel, null);
+
+        // it's the first address bound, let's fire the event
+        this.fireServiceActivated();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void unbind() throws IOException {
+        LOG.info("unbinding {}", address);
+        if (this.address == null) {
+            throw new IllegalStateException("server not bound");
+        }
+
+        readSelectorLoop.unregister(this, datagramChannel);
+        datagramChannel.socket().close();
+        datagramChannel.close();
+
+        this.address = null;
+        this.fireServiceInactivated();
+    }
+
+    /**
+     * @return the readKey
+     */
+    public SelectionKey getReadKey() {
+        return readKey;
+    }
+
+    /**
+     * @param readKey the readKey to set
+     */
+    public void setReadKey(final SelectionKey readKey) {
+        this.readKey = readKey;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+            final boolean write) {
+        // Process the reads first
+        try {
+            final SocketAddress source = datagramChannel.receive(readBuffer);
+            NioUdpSession session = null;
+
+            // let's find the corresponding session
+            if (source != null) {
+                session = sessions.get(source);
+
+                if (session == null) {
+                    session = createSession(source, datagramChannel);
+                }
+                if (read) {
+                    LOG.debug("readable datagram for UDP service : {}", this);
+                    readBuffer.clear();
+
+                    readBuffer.flip();
+
+                    LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
+
+                    session.receivedDatagram(readBuffer);
+
+                }
+
+                // Now, process the writes
+                if (write) {
+                    session.processWrite(readSelectorLoop);
+                }
+            } else {
+                LOG.debug("Do data to read");
+            }
+
+        } catch (final IOException ex) {
+            LOG.error("IOException while reading the socket", ex);
+        }
+    }
+
+    private NioUdpSession createSession(SocketAddress remoteAddress, DatagramChannel datagramChannel)
+            throws IOException {
+        LOG.debug("create session");
+        UdpSessionConfig config = getSessionConfig();
+        SocketAddress localAddress = new InetSocketAddress(datagramChannel.socket().getLocalAddress(), datagramChannel
+                .socket().getLocalPort());
+        final NioUdpSession session = new NioUdpSession(this, idleChecker, datagramChannel, localAddress, remoteAddress);
+
+        // apply idle configuration
+        session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+        session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+                config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+        // apply the default service socket configuration
+
+        Boolean reuseAddress = config.isReuseAddress();
+
+        if (reuseAddress != null) {
+            session.getConfig().setReuseAddress(reuseAddress);
+        }
+
+        Integer readBufferSize = config.getReadBufferSize();
+
+        if (readBufferSize != null) {
+            session.getConfig().setReadBufferSize(readBufferSize);
+        }
+
+        Integer sendBufferSize = config.getSendBufferSize();
+
+        if (sendBufferSize != null) {
+            session.getConfig().setSendBufferSize(sendBufferSize);
+        }
+
+        Integer trafficClass = config.getTrafficClass();
+
+        if (trafficClass != null) {
+            session.getConfig().setTrafficClass(trafficClass);
+        }
+
+        // Manage the Idle status
+        idleChecker.sessionRead(session, System.currentTimeMillis());
+        idleChecker.sessionWritten(session, System.currentTimeMillis());
+
+        sessions.put(remoteAddress, session);
+
+        // Inform the handler that the session has been created
+        session.setConnected();
+
+        return session;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
new file mode 100644
index 0000000..6f099bc
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoService;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.udp.UdpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A UDP session based on NIO
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpSession extends AbstractIoSession implements SelectorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NioUdpSession.class);
+
+    private final SocketAddress localAddress;
+
+    private final SocketAddress remoteAddress;
+
+    /** the socket configuration */
+    private final UdpSessionConfig configuration;
+
+    /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
+    private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+            // we don't cancel close
+            return false;
+        }
+    };
+
+    /**
+     * @param service
+     * @param writeProcessor
+     * @param idleChecker
+     */
+    /* No qualifier*/NioUdpSession(IoService service, IdleChecker idleChecker, DatagramChannel datagramChannel,
+            SocketAddress localAddress, SocketAddress remoteAddress) {
+        super(service, datagramChannel, idleChecker);
+        this.localAddress = localAddress;
+        this.remoteAddress = remoteAddress;
+        this.config = service.getSessionConfig();
+        this.configuration = (UdpSessionConfig) this.config;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void channelClose() {
+        // No inner socket to close for UDP
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flushWriteQueue() {
+        // TODO flush queue
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoFuture<Void> close(boolean immediately) {
+        switch (state) {
+        case CREATED:
+            LOG.error("Session {} not opened", this);
+            throw new IllegalStateException("cannot close an not opened session");
+        case CONNECTED:
+        case CLOSING:
+            if (immediately) {
+                state = SessionState.CLOSED;
+            } else {
+                // we wait for the write queue to be depleted
+                state = SessionState.CLOSING;
+            }
+            break;
+        case CLOSED:
+            LOG.warn("Already closed session {}", this);
+            break;
+        default:
+            throw new IllegalStateException("not implemented session state : " + state);
+        }
+        return closeFuture;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void suspendRead() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void suspendWrite() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void resumeRead() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void resumeWrite() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isReadSuspended() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isWriteSuspended() {
+        // TODO
+        throw new RuntimeException("Not implemented");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public UdpSessionConfig getConfig() {
+        return configuration;
+    }
+
+    /**
+     * Called when the session received a datagram.
+     * 
+     * @param readBuffer the received datagram
+     */
+    void receivedDatagram(ByteBuffer readBuffer) {
+        processMessageReceived(readBuffer);
+        idleChecker.sessionRead(this, System.currentTimeMillis());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected int writeDirect(Object message) {
+        try {
+            // Check that we can write into the channel
+            if (!isRegisteredForWrite()) {
+                // We don't have pending writes
+                // First, connect if we aren't already connected
+                if (!((DatagramChannel) channel).isConnected()) {
+                    ((DatagramChannel) channel).connect(remoteAddress);
+                }
+
+                // And try to write the data. We will either write them all,
+                // or none
+                return ((DatagramChannel) channel).write((ByteBuffer) message);
+            } else {
+                System.out.println("Cannot write");
+                return -1;
+            }
+        } catch (final IOException e) {
+            LOG.error("Exception while reading : ", e);
+            processException(e);
+
+            return -1;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew) {
+        ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
+        if (!message.isDirect()) {
+            int remaining = message.remaining();
+
+            ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining);
+            directBuffer.put(message);
+            directBuffer.flip();
+            writeRequest.setMessage(directBuffer);
+
+            return directBuffer;
+        }
+
+        return message;
+    }
+
+    void setSelectionKey(SelectionKey key) {
+        //this.selectionKey = key;
+    }
+
+    /**
+     * Set this session status as connected. To be called by the processor selecting/polling this session.
+     */
+    void setConnected() {
+        if (!isCreated()) {
+            throw new RuntimeException("Trying to open a non created session");
+        }
+
+        state = SessionState.CONNECTED;
+
+        /*if (connectFuture != null) {
+            connectFuture.complete(this);
+            connectFuture = null; // free some memory
+        }*/
+
+        processSessionOpen();
+    }
+
+    @Override
+    public void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write) {
+    }
+}