You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2013/03/25 00:32:27 UTC
[2/7] o Moved the processWrite method into the AbstraIoSession class
http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
new file mode 100644
index 0000000..fdffd7a
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpServer.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.nio.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OrderedHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
+import org.apache.mina.transport.nio.FixedSelectorLoopPool;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.RegistrationCallback;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.nio.SelectorLoopPool;
+import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a TCP NIO based server.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
+ /** A logger for this class */
+ static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
+
+ /** the bound local address */
+ private SocketAddress address = null;
+
+ private final SelectorLoop acceptSelectorLoop;
+
+ private final SelectorLoopPool readWriteSelectorPool;
+
+ // the key used for selecting accept event
+ private SelectionKey acceptKey = null;
+
+ // the server socket for accepting clients
+ private ServerSocketChannel serverChannel = null;
+
+ private IdleChecker idleChecker;
+
+ /**
+ * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link OrderedHandlerExecutor})
+ */
+ public NioTcpServer() {
+ this(new NioSelectorLoop("accept", 0), new FixedSelectorLoopPool("Server", Runtime.getRuntime()
+ .availableProcessors() + 1), null);
+ }
+
+ /**
+ * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link OrderedHandlerExecutor})
+ *
+ * @param config The specific configuration to use
+ */
+ public NioTcpServer(TcpSessionConfig config) {
+ this(config, new NioSelectorLoop("accept", 0), new FixedSelectorLoopPool("Server", Runtime.getRuntime()
+ .availableProcessors() + 1), null);
+ }
+
+ /**
+ * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+ * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+ * Selector.
+ *
+ * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpServer(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+ this(selectorLoopPool.getSelectorLoop(), selectorLoopPool, handlerExecutor);
+ }
+
+ /**
+ * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+ * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+ * Selector.
+ *
+ * @param config The specific configuration to use
+ * @param selectorLoopPool the selector loop pool for handling all I/O events (accept, read, write)
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpServer(TcpSessionConfig config, SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+ this(config, selectorLoopPool.getSelectorLoop(), selectorLoopPool, handlerExecutor);
+ }
+
+ /**
+ * Create a TCP server with provided selector loops pool
+ *
+ * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+ * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+ IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
+ this.acceptSelectorLoop = acceptSelectorLoop;
+ this.readWriteSelectorPool = readWriteSelectorLoop;
+ }
+
+ /**
+ * Create a TCP server with provided selector loops pool
+ *
+ * @param config The specific configuration to use
+ * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+ * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpServer(TcpSessionConfig config, SelectorLoop acceptSelectorLoop,
+ SelectorLoopPool readWriteSelectorLoop, IoHandlerExecutor handlerExecutor) {
+ super(config, handlerExecutor);
+ this.acceptSelectorLoop = acceptSelectorLoop;
+ this.readWriteSelectorPool = readWriteSelectorLoop;
+ }
+
+ /**
+ * Get the inner Server socket for accepting new client connections
+ *
+ * @return
+ */
+ public ServerSocketChannel getServerSocketChannel() {
+ return this.serverChannel;
+ }
+
+ public void setServerSocketChannel(final ServerSocketChannel serverChannel) {
+ this.serverChannel = serverChannel;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bind(final int port) throws IOException {
+ bind(new InetSocketAddress(port));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void bind(final SocketAddress localAddress) throws IOException {
+ Assert.assertNotNull(localAddress, "localAddress");
+
+ // check if the address is already bound
+ if (this.address != null) {
+ throw new IOException("address " + address + " already bound");
+ }
+
+ LOG.info("binding address {}", localAddress);
+ this.address = localAddress;
+
+ serverChannel = ServerSocketChannel.open();
+ serverChannel.socket().setReuseAddress(isReuseAddress());
+ serverChannel.socket().bind(address);
+ serverChannel.configureBlocking(false);
+
+ acceptSelectorLoop.register(true, false, false, false, this, serverChannel, null);
+
+ idleChecker = new IndexedIdleChecker();
+ idleChecker.start();
+
+ // it's the first address bound, let's fire the event
+ this.fireServiceActivated();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SocketAddress getBoundAddress() {
+ return address;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void unbind() throws IOException {
+ LOG.info("unbinding {}", address);
+ if (this.address == null) {
+ throw new IllegalStateException("server not bound");
+ }
+ serverChannel.socket().close();
+ serverChannel.close();
+ acceptSelectorLoop.unregister(this, serverChannel);
+
+ this.address = null;
+ this.fireServiceInactivated();
+
+ // will stop the acceptor processor if we are the last service
+ idleChecker.destroy();
+ }
+
+ /**
+ * @return the acceptKey
+ */
+ public SelectionKey getAcceptKey() {
+ return acceptKey;
+ }
+
+ /**
+ * @param acceptKey the acceptKey to set
+ */
+ public void setAcceptKey(final SelectionKey acceptKey) {
+ this.acceptKey = acceptKey;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
+ if (accept) {
+ LOG.debug("acceptable new client");
+
+ // accepted connection
+ try {
+ LOG.debug("new client accepted");
+ createSession(getServerSocketChannel().accept());
+
+ } catch (final IOException e) {
+ LOG.error("error while accepting new client", e);
+ }
+ }
+
+ if (read || write) {
+ throw new IllegalStateException("should not receive read or write events");
+ }
+ }
+
+ private void createSession(SocketChannel clientSocket) throws IOException {
+ LOG.debug("create session");
+ SocketChannel socketChannel = clientSocket;
+ TcpSessionConfig config = getSessionConfig();
+ SelectorLoop readWriteSelectorLoop = readWriteSelectorPool.getSelectorLoop();
+ final NioTcpSession session = new NioTcpSession(this, socketChannel, readWriteSelectorLoop, idleChecker);
+
+ socketChannel.configureBlocking(false);
+
+ // apply idle configuration
+ session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+ session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+ config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+ // apply the default service socket configuration
+ Boolean keepAlive = config.isKeepAlive();
+
+ if (keepAlive != null) {
+ session.getConfig().setKeepAlive(keepAlive);
+ }
+
+ Boolean oobInline = config.isOobInline();
+
+ if (oobInline != null) {
+ session.getConfig().setOobInline(oobInline);
+ }
+
+ Boolean reuseAddress = config.isReuseAddress();
+
+ if (reuseAddress != null) {
+ session.getConfig().setReuseAddress(reuseAddress);
+ }
+
+ Boolean tcpNoDelay = config.isTcpNoDelay();
+
+ if (tcpNoDelay != null) {
+ session.getConfig().setTcpNoDelay(tcpNoDelay);
+ }
+
+ Integer receiveBufferSize = config.getReadBufferSize();
+
+ if (receiveBufferSize != null) {
+ session.getConfig().setReadBufferSize(receiveBufferSize);
+ }
+
+ Integer sendBufferSize = config.getSendBufferSize();
+
+ if (sendBufferSize != null) {
+ session.getConfig().setSendBufferSize(sendBufferSize);
+ }
+
+ Integer trafficClass = config.getTrafficClass();
+
+ if (trafficClass != null) {
+ session.getConfig().setTrafficClass(trafficClass);
+ }
+
+ Integer soLinger = config.getSoLinger();
+
+ if (soLinger != null) {
+ session.getConfig().setSoLinger(soLinger);
+ }
+
+ // Set the secured flag if the service is to be used over SSL/TLS
+ if (config.isSecured()) {
+ session.initSecure(config.getSslContext());
+ }
+
+ // add the session to the queue for being added to the selector
+ readWriteSelectorLoop.register(false, false, true, false, session, socketChannel, new RegistrationCallback() {
+
+ @Override
+ public void done(SelectionKey selectionKey) {
+ session.setSelectionKey(selectionKey);
+ session.setConnected();
+ }
+ });
+
+ idleChecker.sessionRead(session, System.currentTimeMillis());
+ idleChecker.sessionWritten(session, System.currentTimeMillis());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
new file mode 100644
index 0000000..f06fda7
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/tcp/NioTcpSession.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.nio.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.api.IoService;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.session.SslHelper;
+import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.nio.RegistrationCallback;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A NIO based TCP session, should be used by {@link NioTcpServer} and {@link NioTcpClient}. A TCP session is a
+ * connection between a our server/client and the remote end-point.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ *
+ */
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
+
+ /** the selector loop in charge of generating read/write events for this session */
+ private final SelectorLoop selectorLoop;
+
+ /** the socket configuration */
+ private final TcpSessionConfig configuration;
+
+ /** the future representing this session connection operation (client only) */
+ private ConnectFuture connectFuture;
+
+ /** The associated selectionKey */
+ private SelectionKey selectionKey;
+
+ /** The Direct Buffer used to send data */
+ private ByteBuffer sendBuffer;
+
+ /** The size of the buffer configured in the socket to send data */
+ private int sendBufferSize;
+
+ /* No qualifier*/NioTcpSession(final IoService service, final SocketChannel channel,
+ final SelectorLoop selectorLoop, final IdleChecker idleChecker) {
+ super(service, channel, idleChecker);
+ this.selectorLoop = selectorLoop;
+ this.configuration = new ProxyTcpSessionConfig(channel.socket());
+ sendBufferSize = configuration.getSendBufferSize();
+ sendBuffer = ByteBuffer.allocateDirect(sendBufferSize);
+ }
+
+ void setConnectFuture(ConnectFuture connectFuture) {
+ this.connectFuture = connectFuture;
+ }
+
+ /**
+ * Get the underlying {@link SocketChannel} of this session
+ *
+ * @return the socket channel used by this session
+ */
+ SocketChannel getSocketChannel() {
+ return (SocketChannel) channel;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ if (channel == null) {
+ return null;
+ }
+ final Socket socket = ((SocketChannel) channel).socket();
+
+ if (socket == null) {
+ return null;
+ }
+
+ return (InetSocketAddress) socket.getRemoteSocketAddress();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ if (channel == null) {
+ return null;
+ }
+
+ final Socket socket = ((SocketChannel) channel).socket();
+
+ if (socket == null) {
+ return null;
+ }
+
+ return (InetSocketAddress) socket.getLocalSocketAddress();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void suspendRead() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void suspendWrite() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int writeDirect(Object message) {
+ try {
+ // Check that we can write into the channel
+ if (!isRegisteredForWrite()) {
+ // We don't have pending writes
+ return ((SocketChannel) channel).write((ByteBuffer) message);
+ } else {
+ return -1;
+ }
+ } catch (final IOException e) {
+ LOG.error("Exception while reading : ", e);
+ processException(e);
+
+ return -1;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew) {
+ ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
+ if (!message.isDirect()) {
+ int remaining = message.remaining();
+
+ if ((remaining > sendBufferSize) || createNew) {
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining);
+ directBuffer.put(message);
+ directBuffer.flip();
+ writeRequest.setMessage(directBuffer);
+
+ return directBuffer;
+ } else {
+ sendBuffer.clear();
+ sendBuffer.put(message);
+ sendBuffer.flip();
+ writeRequest.setMessage(sendBuffer);
+
+ return sendBuffer;
+ }
+ }
+
+ return message;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void resumeRead() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void resumeWrite() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isReadSuspended() {
+ // TODO
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isWriteSuspended() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TcpSessionConfig getConfig() {
+ return configuration;
+ }
+
+ /**
+ * Set this session status as connected. To be called by the processor selecting/polling this session.
+ */
+ void setConnected() {
+ if (!isCreated()) {
+ throw new RuntimeException("Trying to open a non created session");
+ }
+
+ state = SessionState.CONNECTED;
+
+ if (connectFuture != null) {
+ connectFuture.complete(this);
+ connectFuture = null; // free some memory
+ }
+
+ processSessionOpen();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void channelClose() {
+ try {
+ selectorLoop.unregister(this, channel);
+ channel.close();
+ } catch (final IOException e) {
+ LOG.error("Exception while closing the channel : ", e);
+ processException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushWriteQueue() {
+ // register for write
+ selectorLoop.modifyRegistration(false, !isReadSuspended(), true, this, channel, true);
+ }
+
+ /**
+ * Process a read operation : read the data from the channel and push them to the chain.
+ *
+ * @param readBuffer The buffer that will contain the read data
+ */
+ private void processRead(final ByteBuffer readBuffer) {
+ try {
+ LOG.debug("readable session : {}", this);
+
+ // First reset the buffer from what it contained before
+ readBuffer.clear();
+
+ // Read everything we can up to the buffer size
+ final int readCount = ((SocketChannel) channel).read(readBuffer);
+
+ LOG.debug("read {} bytes", readCount);
+
+ if (readCount < 0) {
+ // session closed by the remote peer
+ LOG.debug("session closed by the remote peer");
+ close(true);
+ } else if (readCount > 0) {
+ // we have read some data
+ // limit at the current position & rewind buffer back to start &
+ // push to the chain
+ readBuffer.flip();
+
+ if (isSecured()) {
+ // We are reading data over a SSL/TLS encrypted connection.
+ // Redirect the processing to the SslHelper class.
+ final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+
+ if (sslHelper == null) {
+ throw new IllegalStateException();
+ }
+
+ sslHelper.processRead(this, readBuffer);
+ } else {
+ // Plain message, not encrypted : go directly to the chain
+ processMessageReceived(readBuffer);
+ }
+
+ // Update the session idle status
+ idleChecker.sessionRead(this, System.currentTimeMillis());
+ }
+ } catch (final IOException e) {
+ LOG.error("Exception while reading : ", e);
+ processException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
+ LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept,
+ connect, read, write });
+ if (connect) {
+ try {
+
+ boolean isConnected = ((SocketChannel) channel).finishConnect();
+
+ if (!isConnected) {
+ LOG.error("unable to connect session {}", this);
+ } else {
+ // cancel current registration for connection
+ selectionKey.cancel();
+ selectionKey = null;
+
+ // Register for reading
+ selectorLoop.register(false, false, true, false, this, channel, new RegistrationCallback() {
+
+ @Override
+ public void done(SelectionKey selectionKey) {
+ setConnected();
+ }
+ });
+ }
+ } catch (IOException e) {
+ LOG.debug("Connection error, we cancel the future", e);
+ if (connectFuture != null) {
+ connectFuture.error(e);
+ }
+ }
+ }
+
+ if (read) {
+ processRead(readBuffer);
+ }
+
+ if (write) {
+ processWrite(selectorLoop);
+ }
+ if (accept) {
+ throw new IllegalStateException("accept event should never occur on NioTcpSession");
+ }
+ }
+
+ void setSelectionKey(SelectionKey key) {
+ this.selectionKey = key;
+ }
+
+ static class ConnectFuture extends AbstractIoFuture<IoSession> {
+
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ /**
+ * session connected
+ */
+ public void complete(IoSession session) {
+ setResult(session);
+ }
+
+ /**
+ * connection error
+ */
+ public void error(Exception e) {
+ setException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
new file mode 100644
index 0000000..8ccfd98
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpClient.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.udp.AbstractUdpClient;
+
+/**
+ * TODO
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpClient extends AbstractUdpClient {
+ /** the SelectorLoop for connecting the sessions */
+ // This is final, so that we know if it's not initialized
+ private final SelectorLoop connectSelectorLoop;
+
+ /**
+ * Create a new instance of NioUdpClient
+ */
+ public NioUdpClient() {
+ this(null);
+ }
+
+ /**
+ * Create a new instance of NioUdpClient
+ */
+ public NioUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
+ connectSelectorLoop = new NioSelectorLoop("connect", 0);
+ }
+
+ @Override
+ public IoSessionConfig getSessionConfig() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public IoFuture<IoSession> connect(SocketAddress remoteAddress) throws IOException {
+ DatagramChannel ch = DatagramChannel.open();
+
+ if (remoteAddress != null) {
+ ch.socket().bind(remoteAddress);
+ ch.connect(remoteAddress);
+ }
+
+ return null;
+ }
+
+ @Override
+ public IoFuture<IoSession> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
new file mode 100644
index 0000000..ebc6a7c
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpServer.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OrderedHandlerExecutor;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
+import org.apache.mina.transport.nio.NioSelectorLoop;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.nio.SelectorLoop;
+import org.apache.mina.transport.udp.AbstractUdpServer;
+import org.apache.mina.transport.udp.UdpSessionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a UDP NIO based server.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpServer extends AbstractUdpServer implements SelectorListener {
+
+ static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class);
+
+ // the bound local address
+ private SocketAddress address = null;
+
+ // used for detecting idle sessions
+ private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+ // the inner channel for read/write UDP datagrams
+ private DatagramChannel datagramChannel = null;
+
+ // the key used for selecting read event
+ private SelectionKey readKey = null;
+
+ // list of all the sessions by remote socket address
+ private final Map<SocketAddress /* remote socket address */, NioUdpSession> sessions = new ConcurrentHashMap<SocketAddress, NioUdpSession>();
+
+ /** The selector loop used to incoming data */
+ private final SelectorLoop readSelectorLoop;
+
+ /**
+ * Create an UDP server with a new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link OrderedHandlerExecutor})
+ */
+ public NioUdpServer() {
+ this(new NioSelectorLoop("accept", 0), null);
+ }
+
+ /**
+ * Create an UDP server with a new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link OrderedHandlerExecutor})
+ *
+ * @param sessionConfig The configuration to use for this server
+ */
+ public NioUdpServer(UdpSessionConfig config) {
+ this(config, new NioSelectorLoop("accept", 0), null);
+ }
+
+ /**
+ * Create an UDP server with provided selector loops pool
+ *
+ * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+ * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioUdpServer(SelectorLoop readSelectorLoop, IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
+ this.readSelectorLoop = readSelectorLoop;
+ }
+
+ /**
+ * Create an UDP server with provided selector loops pool
+ *
+ * @param sessionConfig The configuration to use for this server
+ * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioUdpServer(UdpSessionConfig config, SelectorLoop readSelectorLoop, IoHandlerExecutor handlerExecutor) {
+ super(config, handlerExecutor);
+ this.readSelectorLoop = readSelectorLoop;
+ }
+
+ /**
+ * Get the inner datagram channel for read and write operations. To be called by the {@link NioSelectorProcessor}
+ *
+ * @return the datagram channel bound to this {@link NioUdpServer}.
+ */
+ public DatagramChannel getDatagramChannel() {
+ return datagramChannel;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SocketAddress getBoundAddress() {
+ return address;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bind(final int port) throws IOException {
+ bind(new InetSocketAddress(port));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bind(final SocketAddress localAddress) throws IOException {
+ if (localAddress == null) {
+ // We should at least have one address to bind on
+ throw new IllegalArgumentException("LocalAdress cannot be null");
+ }
+
+ // check if the address is already bound
+ if (this.address != null) {
+ throw new IOException("address " + address + " already bound");
+ }
+ address = localAddress;
+
+ LOG.info("binding address {}", localAddress);
+
+ datagramChannel = DatagramChannel.open();
+
+ datagramChannel.socket().setReuseAddress(isReuseAddress());
+ datagramChannel.socket().bind(address);
+ datagramChannel.configureBlocking(false);
+
+ readSelectorLoop.register(false, false, true, false, this, datagramChannel, null);
+
+ // it's the first address bound, let's fire the event
+ this.fireServiceActivated();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unbind() throws IOException {
+ LOG.info("unbinding {}", address);
+ if (this.address == null) {
+ throw new IllegalStateException("server not bound");
+ }
+
+ readSelectorLoop.unregister(this, datagramChannel);
+ datagramChannel.socket().close();
+ datagramChannel.close();
+
+ this.address = null;
+ this.fireServiceInactivated();
+ }
+
+ /**
+ * @return the readKey
+ */
+ public SelectionKey getReadKey() {
+ return readKey;
+ }
+
+ /**
+ * @param readKey the readKey to set
+ */
+ public void setReadKey(final SelectionKey readKey) {
+ this.readKey = readKey;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ready(final boolean accept, boolean connect, final boolean read, final ByteBuffer readBuffer,
+ final boolean write) {
+ // Process the reads first
+ try {
+ final SocketAddress source = datagramChannel.receive(readBuffer);
+ NioUdpSession session = null;
+
+ // let's find the corresponding session
+ if (source != null) {
+ session = sessions.get(source);
+
+ if (session == null) {
+ session = createSession(source, datagramChannel);
+ }
+ if (read) {
+ LOG.debug("readable datagram for UDP service : {}", this);
+ readBuffer.clear();
+
+ readBuffer.flip();
+
+ LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
+
+ session.receivedDatagram(readBuffer);
+
+ }
+
+ // Now, process the writes
+ if (write) {
+ session.processWrite(readSelectorLoop);
+ }
+ } else {
+ LOG.debug("Do data to read");
+ }
+
+ } catch (final IOException ex) {
+ LOG.error("IOException while reading the socket", ex);
+ }
+ }
+
+ private NioUdpSession createSession(SocketAddress remoteAddress, DatagramChannel datagramChannel)
+ throws IOException {
+ LOG.debug("create session");
+ UdpSessionConfig config = getSessionConfig();
+ SocketAddress localAddress = new InetSocketAddress(datagramChannel.socket().getLocalAddress(), datagramChannel
+ .socket().getLocalPort());
+ final NioUdpSession session = new NioUdpSession(this, idleChecker, datagramChannel, localAddress, remoteAddress);
+
+ // apply idle configuration
+ session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+ session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+ config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+ // apply the default service socket configuration
+
+ Boolean reuseAddress = config.isReuseAddress();
+
+ if (reuseAddress != null) {
+ session.getConfig().setReuseAddress(reuseAddress);
+ }
+
+ Integer readBufferSize = config.getReadBufferSize();
+
+ if (readBufferSize != null) {
+ session.getConfig().setReadBufferSize(readBufferSize);
+ }
+
+ Integer sendBufferSize = config.getSendBufferSize();
+
+ if (sendBufferSize != null) {
+ session.getConfig().setSendBufferSize(sendBufferSize);
+ }
+
+ Integer trafficClass = config.getTrafficClass();
+
+ if (trafficClass != null) {
+ session.getConfig().setTrafficClass(trafficClass);
+ }
+
+ // Manage the Idle status
+ idleChecker.sessionRead(session, System.currentTimeMillis());
+ idleChecker.sessionWritten(session, System.currentTimeMillis());
+
+ sessions.put(remoteAddress, session);
+
+ // Inform the handler that the session has been created
+ session.setConnected();
+
+ return session;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina/blob/0de06254/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
new file mode 100644
index 0000000..6f099bc
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/transport/nio/udp/NioUdpSession.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio.udp;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoService;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.nio.SelectorListener;
+import org.apache.mina.transport.udp.UdpSessionConfig;
+import org.apache.mina.util.AbstractIoFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A UDP session based on NIO
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioUdpSession extends AbstractIoSession implements SelectorListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NioUdpSession.class);
+
+ private final SocketAddress localAddress;
+
+ private final SocketAddress remoteAddress;
+
+ /** the socket configuration */
+ private final UdpSessionConfig configuration;
+
+ /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
+ private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ // we don't cancel close
+ return false;
+ }
+ };
+
+ /**
+ * @param service
+ * @param writeProcessor
+ * @param idleChecker
+ */
+ /* No qualifier*/NioUdpSession(IoService service, IdleChecker idleChecker, DatagramChannel datagramChannel,
+ SocketAddress localAddress, SocketAddress remoteAddress) {
+ super(service, datagramChannel, idleChecker);
+ this.localAddress = localAddress;
+ this.remoteAddress = remoteAddress;
+ this.config = service.getSessionConfig();
+ this.configuration = (UdpSessionConfig) this.config;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void channelClose() {
+ // No inner socket to close for UDP
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushWriteQueue() {
+ // TODO flush queue
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SocketAddress getLocalAddress() {
+ return localAddress;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public IoFuture<Void> close(boolean immediately) {
+ switch (state) {
+ case CREATED:
+ LOG.error("Session {} not opened", this);
+ throw new IllegalStateException("cannot close an not opened session");
+ case CONNECTED:
+ case CLOSING:
+ if (immediately) {
+ state = SessionState.CLOSED;
+ } else {
+ // we wait for the write queue to be depleted
+ state = SessionState.CLOSING;
+ }
+ break;
+ case CLOSED:
+ LOG.warn("Already closed session {}", this);
+ break;
+ default:
+ throw new IllegalStateException("not implemented session state : " + state);
+ }
+ return closeFuture;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void suspendRead() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void suspendWrite() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void resumeRead() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void resumeWrite() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isReadSuspended() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isWriteSuspended() {
+ // TODO
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public UdpSessionConfig getConfig() {
+ return configuration;
+ }
+
+ /**
+ * Called when the session received a datagram.
+ *
+ * @param readBuffer the received datagram
+ */
+ void receivedDatagram(ByteBuffer readBuffer) {
+ processMessageReceived(readBuffer);
+ idleChecker.sessionRead(this, System.currentTimeMillis());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int writeDirect(Object message) {
+ try {
+ // Check that we can write into the channel
+ if (!isRegisteredForWrite()) {
+ // We don't have pending writes
+ // First, connect if we aren't already connected
+ if (!((DatagramChannel) channel).isConnected()) {
+ ((DatagramChannel) channel).connect(remoteAddress);
+ }
+
+ // And try to write the data. We will either write them all,
+ // or none
+ return ((DatagramChannel) channel).write((ByteBuffer) message);
+ } else {
+ System.out.println("Cannot write");
+ return -1;
+ }
+ } catch (final IOException e) {
+ LOG.error("Exception while reading : ", e);
+ processException(e);
+
+ return -1;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew) {
+ ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
+ if (!message.isDirect()) {
+ int remaining = message.remaining();
+
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining);
+ directBuffer.put(message);
+ directBuffer.flip();
+ writeRequest.setMessage(directBuffer);
+
+ return directBuffer;
+ }
+
+ return message;
+ }
+
+ void setSelectionKey(SelectionKey key) {
+ //this.selectionKey = key;
+ }
+
+ /**
+ * Set this session status as connected. To be called by the processor selecting/polling this session.
+ */
+ void setConnected() {
+ if (!isCreated()) {
+ throw new RuntimeException("Trying to open a non created session");
+ }
+
+ state = SessionState.CONNECTED;
+
+ /*if (connectFuture != null) {
+ connectFuture.complete(this);
+ connectFuture = null; // free some memory
+ }*/
+
+ processSessionOpen();
+ }
+
+ @Override
+ public void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write) {
+ }
+}