You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/05/22 17:02:29 UTC
svn commit: r1341509 - in /mina/trunk:
core/src/main/java/org/apache/mina/service/
core/src/main/java/org/apache/mina/transport/tcp/
core/src/main/java/org/apache/mina/transport/tcp/nio/
core/src/main/java/org/apache/mina/transport/udp/ core/src/main/j...
Author: jvermillard
Date: Tue May 22 15:02:28 2012
New Revision: 1341509
URL: http://svn.apache.org/viewvc?rev=1341509&view=rev
Log:
removed ugly selector factory, preparing for UDP server
Removed:
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorFactory.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java Tue May 22 15:02:28 2012
@@ -19,9 +19,6 @@
*/
package org.apache.mina.service;
-import java.io.IOException;
-import java.net.SocketAddress;
-
import org.apache.mina.api.IoSession;
/**
@@ -31,36 +28,28 @@ import org.apache.mina.api.IoSession;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
-public class OneThreadSelectorStrategy implements SelectorStrategy {
+public class OneThreadSelectorStrategy<PROCESSOR extends SelectorProcessor> implements SelectorStrategy<PROCESSOR> {
/** The processor in charge of the messages processing */
- private SelectorProcessor processor;
+ private final PROCESSOR processor;
- /**
- * Creates an instance of the OneThreadSelectorStrategy class
- * @param selectorFactory The Selector factory to use to create the processor
- */
- public OneThreadSelectorStrategy(SelectorFactory selectorFactory) {
- this.processor = selectorFactory.getNewSelector("uniqueSelector", this);
+ public OneThreadSelectorStrategy(PROCESSOR processor) {
+ processor.setStrategy(this);
+ this.processor = processor;
}
@Override
- public SelectorProcessor getSelectorForBindNewAddress() {
+ public PROCESSOR getSelectorForBindNewAddress() {
return processor;
}
@Override
- public SelectorProcessor getSelectorForNewSession(SelectorProcessor acceptingProcessor) {
+ public PROCESSOR getSelectorForNewSession(SelectorProcessor acceptingProcessor) {
return processor;
}
@Override
- public SelectorProcessor getSelectorForWrite(IoSession session) {
+ public PROCESSOR getSelectorForWrite(IoSession session) {
return processor;
}
- @Override
- public void unbind(SocketAddress address) throws IOException {
- processor.unbind(address);
- }
-
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java Tue May 22 15:02:28 2012
@@ -23,10 +23,11 @@ package org.apache.mina.service;
import java.io.IOException;
import java.net.SocketAddress;
-import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.udp.AbstractUdpServer;
/**
* A processor in charge of a group of client session and server sockets.
@@ -36,6 +37,9 @@ import org.apache.mina.session.AbstractI
*/
public interface SelectorProcessor {
+
+ void setStrategy(SelectorStrategy<?> strategy);
+
/**
* create a session for a freshly accepted client socket
* @param service
@@ -44,11 +48,20 @@ public interface SelectorProcessor {
void createSession(IoService service, Object clientSocket) throws IOException;
/**
- * Bind and start processing this new server address
+ * Bind and start processing this new server TCP address
+ * @param server the server for the new address
* @param address local address to bind
* @throws IOException exception thrown if any problem occurs while binding
*/
- void bindAndAcceptAddress(IoServer server, SocketAddress address) throws IOException;
+ void bindTcpServer(AbstractTcpServer server, SocketAddress address) throws IOException;
+
+ /**
+ * Bind and start processing this new server UDP address
+ * @param server the server for the new address
+ * @param address local address to bind
+ * @throws IOException exception thrown if any problem occurs while binding
+ */
+ void bindUdpServer(AbstractUdpServer server, SocketAddress address) throws IOException;
/**
* Stop processing and unbind this server address
Modified: mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java Tue May 22 15:02:28 2012
@@ -19,23 +19,20 @@
*/
package org.apache.mina.service;
-import java.io.IOException;
-import java.net.SocketAddress;
-
import org.apache.mina.api.IoSession;
/**
* Strategy for balancing server socket and client socket to different selecting/polling threads.
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public interface SelectorStrategy {
+public interface SelectorStrategy<PROCESSOR extends SelectorProcessor> {
/**
* Provide a {@link SelectorProcessor} for a newly accepted {@link IoSession}.
* @param acceptingProcessor the selector which accepted the {@link IoSession}
* @return a processor for processing the new session
*/
- SelectorProcessor getSelectorForNewSession(SelectorProcessor acceptingProcessor);
+ PROCESSOR getSelectorForNewSession(SelectorProcessor acceptingProcessor);
/**
* Provide a {@link SelectorProcessor} for a {@link IoSession} which need to write data.
@@ -44,19 +41,13 @@ public interface SelectorStrategy {
* @param session the session in need of writing
* @return the selector processor for handling this session write events
*/
- SelectorProcessor getSelectorForWrite(IoSession session);
+ PROCESSOR getSelectorForWrite(IoSession session);
/**
* Provide a {@link SelectorProcessor} for processing a newly bound address.
* The processor will accept the incoming connections.
* @return a {@link SelectorProcessor} for processing a newly bound address
*/
- SelectorProcessor getSelectorForBindNewAddress();
+ PROCESSOR getSelectorForBindNewAddress();
- /**
- * Unbind an address and remove it from its {@link SelectorProcessor}
- * @param address the address to be unbound and removed
- * @throws IOException thrown if any problem occurs while unbinding
- */
- void unbind(SocketAddress address) throws IOException;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Tue May 22 15:02:28 2012
@@ -25,6 +25,7 @@ import static org.apache.mina.api.IoSess
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@@ -54,6 +55,7 @@ import org.apache.mina.session.AbstractI
import org.apache.mina.session.DefaultWriteFuture;
import org.apache.mina.session.SslHelper;
import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.udp.AbstractUdpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,551 +66,618 @@ import org.slf4j.LoggerFactory;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class NioSelectorProcessor implements SelectorProcessor {
- /** A logger for this class */
- private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorProcessor.class);
-
- /**
- * A timeout used for the select, as we need to get out to deal with idle
- * sessions
- */
- private static final long SELECT_TIMEOUT = 1000L;
-
- private final SelectorStrategy strategy;
-
- private final Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
-
- /** Read buffer for all the incoming bytes (default to 64Kb) */
- private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
-
- /** the thread polling and processing the I/O events */
- private SelectorWorker worker = null;
-
- /** helper for detecting idleing sessions */
- private final IdleChecker idleChecker = new IndexedIdleChecker();
-
- /** A queue containing the servers to bind to this selector */
- private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
-
- /** server to remove of the selector */
- private final Queue<ServerSocketChannel> serversToRemove = new ConcurrentLinkedQueue<ServerSocketChannel>();
-
- /** new session freshly accepted, placed here for being added to the selector */
- private final Queue<NioTcpSession> sessionsToConnect = new ConcurrentLinkedQueue<NioTcpSession>();
-
- /** session to be removed of the selector */
- private final Queue<NioTcpSession> sessionsToClose = new ConcurrentLinkedQueue<NioTcpSession>();
-
- /** A queue used to store the sessions to be flushed */
- private final Queue<NioTcpSession> flushingSessions = new ConcurrentLinkedQueue<NioTcpSession>();
-
- private Selector selector;
-
- // Lock for Selector worker, using default. can look into fairness later.
- // We need to think about a lock less mechanism here.
- private final Lock workerLock = new ReentrantLock();
-
- public NioSelectorProcessor(final String name, final SelectorStrategy strategy) {
- this.strategy = strategy;
- }
-
- /**
- * Add a bound server channel for starting accepting new client connections.
- *
- * @param serverChannel
- */
- private void add(final ServerSocketChannel serverChannel, final IoServer server) {
- LOGGER.debug("adding a server channel {} for server {}", serverChannel, server);
- this.serversToAdd.add(new Object[] { serverChannel, server });
- this.wakeupWorker();
- }
-
- /**
- * Wake the I/O worker thread and if none exists, create a new one
- * FIXME : too much locking there ?
- */
- private void wakeupWorker() {
- this.workerLock.lock();
- try {
- if (this.worker == null) {
- this.worker = new SelectorWorker();
- this.worker.start();
- }
- } finally {
- this.workerLock.unlock();
- }
-
- if (this.selector != null) {
- this.selector.wakeup();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void bindAndAcceptAddress(final IoServer server, final SocketAddress address) throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-
- // FIXME : should be "genericified"
- if (server instanceof AbstractTcpServer) {
- serverSocketChannel.socket().setReuseAddress(((AbstractTcpServer) server).isReuseAddress());
- }
- serverSocketChannel.socket().bind(address);
- serverSocketChannel.configureBlocking(false);
- this.serverSocketChannels.put(address, serverSocketChannel);
- this.add(serverSocketChannel, server);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void unbind(final SocketAddress address) throws IOException {
- ServerSocketChannel channel = this.serverSocketChannels.get(address);
- channel.socket().close();
- channel.close();
- if (this.serverSocketChannels.remove(address) == null) {
- LOGGER.warn("The server channel for address {} was already unbound", address);
- }
- LOGGER.debug("Removing a server channel {}", channel);
- this.serversToRemove.add(channel);
- this.wakeupWorker();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void createSession(final IoService service, final Object clientSocket) throws SSLException {
- LOGGER.debug("create session");
- final SocketChannel socketChannel = (SocketChannel) clientSocket;
- final TcpSessionConfig config = (TcpSessionConfig) service.getSessionConfig();
- final NioTcpSession session = new NioTcpSession(service, socketChannel,
- this.strategy.getSelectorForNewSession(this));
-
- try {
- socketChannel.configureBlocking(false);
- } catch (IOException e) {
- LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
- throw new RuntimeIoException("cannot configure socket as non-blocking", e);
- }
- // apply idle configuration
- session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
- session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
- config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
-
- // apply the default service socket configuration
- Boolean keepAlive = config.isKeepAlive();
-
- if (keepAlive != null) {
- session.getConfig().setKeepAlive(keepAlive);
- }
-
- Boolean oobInline = config.isOobInline();
-
- if (oobInline != null) {
- session.getConfig().setOobInline(oobInline);
- }
-
- Boolean reuseAddress = config.isReuseAddress();
-
- if (reuseAddress != null) {
- session.getConfig().setReuseAddress(reuseAddress);
- }
-
- Boolean tcpNoDelay = config.isTcpNoDelay();
-
- if (tcpNoDelay != null) {
- session.getConfig().setTcpNoDelay(tcpNoDelay);
- }
-
- Integer receiveBufferSize = config.getReceiveBufferSize();
-
- if (receiveBufferSize != null) {
- session.getConfig().setReceiveBufferSize(receiveBufferSize);
- }
-
- Integer sendBufferSize = config.getSendBufferSize();
-
- if (sendBufferSize != null) {
- session.getConfig().setSendBufferSize(sendBufferSize);
- }
-
- Integer trafficClass = config.getTrafficClass();
-
- if (trafficClass != null) {
- session.getConfig().setTrafficClass(trafficClass);
- }
-
- Integer soLinger = config.getSoLinger();
-
- if (soLinger != null) {
- session.getConfig().setSoLinger(soLinger);
- }
-
- // Set the secured flag if the service is to be used over SSL/TLS
- if (config.isSecured()) {
- session.initSecure(config.getSslContext());
- }
-
- // event session created
- session.processSessionCreated();
-
- // add the session to the queue for being added to the selector
- this.sessionsToConnect.add(session);
- this.wakeupWorker();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void flush(final AbstractIoSession session) {
- LOGGER.debug("scheduling session {} for writing", session);
- // add the session to the list of session to be registered for writing
- this.flushingSessions.add((NioTcpSession) session);
- // wake the selector for unlocking the I/O thread
- this.wakeupWorker();
- }
-
- /**
- * The worker processing incoming session creation, session destruction requests, session write and reads.
- * It will also bind new servers.
- */
- private class SelectorWorker extends Thread {
- // map for finding the keys associated with a given server
- private final Map<ServerSocketChannel, SelectionKey> serverKey = new HashMap<ServerSocketChannel, SelectionKey>();
-
- // map for finding read keys associated with a given session
- private final Map<NioTcpSession, SelectionKey> sessionReadKey = new HashMap<NioTcpSession, SelectionKey>();
-
- @Override
- public void run() {
- try {
- if (NioSelectorProcessor.this.selector == null) {
- LOGGER.debug("opening a new selector");
-
- try {
- NioSelectorProcessor.this.selector = Selector.open();
- } catch (IOException e) {
- LOGGER.error("IOException while opening a new Selector", e);
- }
- }
-
- for (;;) {
- try {
- // pop server sockets for removing
- if (NioSelectorProcessor.this.serversToRemove.size() > 0) {
- this.processServerRemove();
- }
-
- // pop new server sockets for accepting
- if (NioSelectorProcessor.this.serversToAdd.size() > 0) {
- this.processServerAdd();
- }
-
- // pop new session for starting read/write
- if (NioSelectorProcessor.this.sessionsToConnect.size() > 0) {
- this.processConnectSessions();
- }
-
- // pop session for close, if any
- if (NioSelectorProcessor.this.sessionsToClose.size() > 0) {
- this.processCloseSessions();
- }
-
- LOGGER.debug("selecting...");
- int readyCount = NioSelectorProcessor.this.selector.select(SELECT_TIMEOUT);
- LOGGER.debug("... done selecting : {}", readyCount);
-
- if (readyCount > 0) {
- // process selected keys
- Iterator<SelectionKey> selectedKeys = NioSelectorProcessor.this.selector.selectedKeys()
- .iterator();
-
- // Loop on each SelectionKey and process any valid action
- while (selectedKeys.hasNext()) {
- SelectionKey key = selectedKeys.next();
- selectedKeys.remove();
-
- if (!key.isValid()) {
- continue;
- }
-
- NioSelectorProcessor.this.selector.selectedKeys().remove(key);
-
- if (key.isAcceptable()) {
- this.processAccept(key);
- }
-
- if (key.isReadable()) {
- this.processRead(key);
- }
-
- if (key.isWritable()) {
- this.processWrite(key);
- }
-
- }
- }
-
- // registering session with data in the write queue for
- // writing
- while (!NioSelectorProcessor.this.flushingSessions.isEmpty()) {
- this.processFlushSessions();
- }
- } catch (IOException e) {
- LOGGER.error("IOException while selecting selector", e);
- }
-
- // stop the worker if needed
- NioSelectorProcessor.this.workerLock.lock();
-
- try {
- if (NioSelectorProcessor.this.selector.keys().isEmpty()) {
- NioSelectorProcessor.this.worker = null;
- break;
- }
- } finally {
- NioSelectorProcessor.this.workerLock.unlock();
- }
-
- // check for idle events
- NioSelectorProcessor.this.idleChecker.processIdleSession(System.currentTimeMillis());
- }
- } catch (Exception e) {
- LOGGER.error("Unexpected exception : ", e);
- }
- }
-
- /**
- * Handles the servers removal
- */
- private void processServerRemove() {
- while (!NioSelectorProcessor.this.serversToRemove.isEmpty()) {
- ServerSocketChannel channel = NioSelectorProcessor.this.serversToRemove.poll();
- SelectionKey key = this.serverKey.remove(channel);
-
- if (key == null) {
- LOGGER.error("The server socket was already removed of the selector");
- } else {
- LOGGER.debug("Removing the server from this selector : {}", key);
- key.cancel();
- }
- }
- }
-
- /**
- * Handles the servers addition
- */
- private void processServerAdd() throws IOException {
- while (!NioSelectorProcessor.this.serversToAdd.isEmpty()) {
- Object[] tmp = NioSelectorProcessor.this.serversToAdd.poll();
- ServerSocketChannel channel = (ServerSocketChannel) tmp[0];
- SelectionKey key = channel.register(NioSelectorProcessor.this.selector, SelectionKey.OP_ACCEPT);
- key.attach(tmp);
- LOGGER.debug("Accepted the server on this selector : {}", key);
- }
- }
-
- /**
- * Handles all the sessions that must be connected
- */
- private void processConnectSessions() throws IOException {
- while (!NioSelectorProcessor.this.sessionsToConnect.isEmpty()) {
- NioTcpSession session = NioSelectorProcessor.this.sessionsToConnect.poll();
- SelectionKey key = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
- SelectionKey.OP_READ);
- key.attach(session);
- this.sessionReadKey.put(session, key);
-
- // Switch to CONNECTED, only if the session is not secured, as the SSL Handshake
- // will occur later.
- if (!session.isSecured()) {
- session.setConnected();
-
- // fire the event
- ((AbstractIoService) session.getService()).fireSessionCreated(session);
- session.processSessionOpened();
- long time = System.currentTimeMillis();
- NioSelectorProcessor.this.idleChecker.sessionRead(session, time);
- NioSelectorProcessor.this.idleChecker.sessionWritten(session, time);
- }
- }
- }
-
- /**
- * Handles all the sessions that must be closed
- */
- private void processCloseSessions() throws IOException {
- while (!NioSelectorProcessor.this.sessionsToClose.isEmpty()) {
- NioTcpSession session = NioSelectorProcessor.this.sessionsToClose.poll();
-
- SelectionKey key = this.sessionReadKey.remove(session);
- key.cancel();
-
- // closing underlying socket
- session.getSocketChannel().close();
- // fire the event
- session.processSessionClosed();
- ((AbstractIoService) session.getService()).fireSessionDestroyed(session);
- }
- }
-
- /**
- * Processes the Accept action for the given SelectionKey
- */
- private void processAccept(final SelectionKey key) throws IOException {
- LOGGER.debug("acceptable new client {}", key);
- ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key.attachment())[0];
- IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
- // accepted connection
- SocketChannel newClientChannel = serverSocket.accept();
- LOGGER.debug("client accepted");
- // and give it's to the strategy
- NioSelectorProcessor.this.strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(
- server, newClientChannel);
- }
-
- /**
- * Processes the Read action for the given SelectionKey
- */
- private void processRead(final SelectionKey key) throws IOException {
- LOGGER.debug("readable client {}", key);
- NioTcpSession session = (NioTcpSession) key.attachment();
- SocketChannel channel = session.getSocketChannel();
- NioSelectorProcessor.this.readBuffer.clear();
- int readCount = channel.read(NioSelectorProcessor.this.readBuffer);
-
- LOGGER.debug("read {} bytes", readCount);
-
- if (readCount < 0) {
- // session closed by the remote peer
- LOGGER.debug("session closed by the remote peer");
- NioSelectorProcessor.this.sessionsToClose.add(session);
- } else {
- // we have read some data
- // limit at the current position & rewind buffer back to start & push to the chain
- NioSelectorProcessor.this.readBuffer.flip();
-
- if (session.isSecured()) {
- // We are reading data over a SSL/TLS encrypted connection. Redirect
- // the processing to the SslHelper class.
- SslHelper sslHelper = session.getAttribute(SSL_HELPER, null);
-
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
-
- sslHelper.processRead(session, NioSelectorProcessor.this.readBuffer);
- } else {
- // Plain message, not encrypted : go directly to the chain
- session.processMessageReceived(NioSelectorProcessor.this.readBuffer);
- }
-
- NioSelectorProcessor.this.idleChecker.sessionRead(session, System.currentTimeMillis());
- }
- }
-
- /**
- * Processes the Write action for the given SelectionKey
- */
- private void processWrite(final SelectionKey key) throws IOException {
- NioTcpSession session = (NioTcpSession) key.attachment();
-
- LOGGER.debug("writable session : {}", session);
-
- session.setNotRegisteredForWrite();
-
- // write from the session write queue
- boolean isEmpty = false;
-
- try {
- Queue<WriteRequest> queue = session.acquireWriteQueue();
-
- do {
- // get a write request from the queue
- WriteRequest wreq = queue.peek();
-
- if (wreq == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) wreq.getMessage();
-
- // Note that if the connection is secured, the buffer already
- // contains encrypted data.
- int wrote = session.getSocketChannel().write(buf);
- session.incrementWrittenBytes(wrote);
- LOGGER.debug("wrote {} bytes to {}", wrote, session);
-
- NioSelectorProcessor.this.idleChecker.sessionWritten(session, System.currentTimeMillis());
-
- if (buf.remaining() == 0) {
- // completed write request, let's remove it
- queue.remove();
- // complete the future
- DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
-
- if (future != null) {
- future.complete();
- }
- } else {
- // output socket buffer is full, we need
- // to give up until next selection for
- // writing
- break;
- }
- } while (!queue.isEmpty());
-
- isEmpty = queue.isEmpty();
- } finally {
- session.releaseWriteQueue();
- }
-
- // if the session is no more interested in writing, we need
- // to stop listening for OP_WRITE events
- if (isEmpty) {
- if (session.isClosing()) {
- LOGGER.debug("closing session {} have empty write queue, so we close it", session);
- // we was flushing writes, now we to the close
- session.getSocketChannel().close();
- } else {
- // a key registered for read ? (because we can have a
- // Selector for reads and another for the writes
- SelectionKey readKey = this.sessionReadKey.get(session);
-
- if (readKey != null) {
- LOGGER.debug("registering key for only reading");
- SelectionKey mykey = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
- SelectionKey.OP_READ, session);
- this.sessionReadKey.put(session, mykey);
- } else {
- LOGGER.debug("cancel key for writing");
- session.getSocketChannel().keyFor(NioSelectorProcessor.this.selector).cancel();
- }
- }
- }
- }
-
- /**
- * Flushes the sessions
- */
- private void processFlushSessions() throws IOException {
- NioTcpSession session = NioSelectorProcessor.this.flushingSessions.poll();
- // a key registered for read ? (because we can have a
- // Selector for reads and another for the writes
- SelectionKey readKey = this.sessionReadKey.get(session);
-
- if (readKey != null) {
- // register for read/write
- SelectionKey key = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
- SelectionKey.OP_READ | SelectionKey.OP_WRITE, session);
-
- this.sessionReadKey.put(session, key);
- } else {
- session.getSocketChannel().register(NioSelectorProcessor.this.selector, SelectionKey.OP_WRITE, session);
- }
- }
- }
+ /** A logger for this class */
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(NioSelectorProcessor.class);
+
+ /**
+ * A timeout used for the select, as we need to get out to deal with idle
+ * sessions
+ */
+ private static final long SELECT_TIMEOUT = 1000L;
+
+ private SelectorStrategy<NioSelectorProcessor> strategy;
+
+ private final Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
+
+ private final Map<SocketAddress, DatagramChannel> datagramChannels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
+
+ /** Read buffer for all the incoming bytes (default to 64Kb) */
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
+
+ /** the thread polling and processing the I/O events */
+ private SelectorWorker worker = null;
+
+ /** helper for detecting idleing sessions */
+ private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+ /** A queue containing the servers to bind to this selector */
+ private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
+
+ /** server to remove of the selector */
+ private final Queue<ServerSocketChannel> serversToRemove = new ConcurrentLinkedQueue<ServerSocketChannel>();
+
+ /**
+ * new session freshly accepted, placed here for being added to the selector
+ */
+ private final Queue<NioTcpSession> sessionsToConnect = new ConcurrentLinkedQueue<NioTcpSession>();
+
+ /** session to be removed of the selector */
+ private final Queue<NioTcpSession> sessionsToClose = new ConcurrentLinkedQueue<NioTcpSession>();
+
+ /** A queue used to store the sessions to be flushed */
+ private final Queue<NioTcpSession> flushingSessions = new ConcurrentLinkedQueue<NioTcpSession>();
+
+ private Selector selector;
+
+ // Lock for Selector worker, using default. can look into fairness later.
+ // We need to think about a lock less mechanism here.
+ private final Lock workerLock = new ReentrantLock();
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setStrategy(SelectorStrategy<?> strategy) {
+ this.strategy = (SelectorStrategy<NioSelectorProcessor>)strategy;
+ }
+
+ /**
+ * Add a bound server channel for starting accepting new client connections.
+ *
+ * @param serverChannel
+ */
+ private void add(final ServerSocketChannel serverChannel,
+ final IoServer server) {
+ LOGGER.debug("adding a server channel {} for server {}", serverChannel,
+ server);
+ this.serversToAdd.add(new Object[] { serverChannel, server });
+ this.wakeupWorker();
+ }
+
+ /**
+ * Wake the I/O worker thread and if none exists, create a new one FIXME :
+ * too much locking there ?
+ */
+ private void wakeupWorker() {
+ this.workerLock.lock();
+ try {
+ if (this.worker == null) {
+ this.worker = new SelectorWorker();
+ this.worker.start();
+ }
+ } finally {
+ this.workerLock.unlock();
+ }
+
+ if (this.selector != null) {
+ this.selector.wakeup();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bindTcpServer(final AbstractTcpServer server,
+ final SocketAddress address) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().setReuseAddress(server.isReuseAddress());
+ serverSocketChannel.socket().bind(address);
+ serverSocketChannel.configureBlocking(false);
+ this.serverSocketChannels.put(address, serverSocketChannel);
+ this.add(serverSocketChannel, server);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bindUdpServer(AbstractUdpServer server, SocketAddress address)
+ throws IOException {
+ DatagramChannel datagramChannel = DatagramChannel.open();
+ datagramChannel.socket().setReuseAddress(server.isReuseAddress());
+ datagramChannel.socket().bind(address);
+ datagramChannel.configureBlocking(false);
+ this.datagramChannels.put(address, datagramChannel);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unbind(final SocketAddress address) throws IOException {
+ ServerSocketChannel channel = this.serverSocketChannels.get(address);
+ channel.socket().close();
+ channel.close();
+ if (this.serverSocketChannels.remove(address) == null) {
+ LOGGER.warn(
+ "The server channel for address {} was already unbound",
+ address);
+ }
+ LOGGER.debug("Removing a server channel {}", channel);
+ this.serversToRemove.add(channel);
+ this.wakeupWorker();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void createSession(final IoService service, final Object clientSocket)
+ throws SSLException {
+ LOGGER.debug("create session");
+ final SocketChannel socketChannel = (SocketChannel) clientSocket;
+ final TcpSessionConfig config = (TcpSessionConfig) service
+ .getSessionConfig();
+ final NioTcpSession session = new NioTcpSession(service, socketChannel,
+ this.strategy.getSelectorForNewSession(this));
+
+ try {
+ socketChannel.configureBlocking(false);
+ } catch (IOException e) {
+ LOGGER.error(
+ "Unexpected exception, while configuring socket as non blocking",
+ e);
+ throw new RuntimeIoException(
+ "cannot configure socket as non-blocking", e);
+ }
+ // apply idle configuration
+ session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE,
+ config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+ session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+ config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+ // apply the default service socket configuration
+ Boolean keepAlive = config.isKeepAlive();
+
+ if (keepAlive != null) {
+ session.getConfig().setKeepAlive(keepAlive);
+ }
+
+ Boolean oobInline = config.isOobInline();
+
+ if (oobInline != null) {
+ session.getConfig().setOobInline(oobInline);
+ }
+
+ Boolean reuseAddress = config.isReuseAddress();
+
+ if (reuseAddress != null) {
+ session.getConfig().setReuseAddress(reuseAddress);
+ }
+
+ Boolean tcpNoDelay = config.isTcpNoDelay();
+
+ if (tcpNoDelay != null) {
+ session.getConfig().setTcpNoDelay(tcpNoDelay);
+ }
+
+ Integer receiveBufferSize = config.getReceiveBufferSize();
+
+ if (receiveBufferSize != null) {
+ session.getConfig().setReceiveBufferSize(receiveBufferSize);
+ }
+
+ Integer sendBufferSize = config.getSendBufferSize();
+
+ if (sendBufferSize != null) {
+ session.getConfig().setSendBufferSize(sendBufferSize);
+ }
+
+ Integer trafficClass = config.getTrafficClass();
+
+ if (trafficClass != null) {
+ session.getConfig().setTrafficClass(trafficClass);
+ }
+
+ Integer soLinger = config.getSoLinger();
+
+ if (soLinger != null) {
+ session.getConfig().setSoLinger(soLinger);
+ }
+
+ // Set the secured flag if the service is to be used over SSL/TLS
+ if (config.isSecured()) {
+ session.initSecure(config.getSslContext());
+ }
+
+ // event session created
+ session.processSessionCreated();
+
+ // add the session to the queue for being added to the selector
+ this.sessionsToConnect.add(session);
+ this.wakeupWorker();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush(final AbstractIoSession session) {
+ LOGGER.debug("scheduling session {} for writing", session);
+ // add the session to the list of session to be registered for writing
+ this.flushingSessions.add((NioTcpSession) session);
+ // wake the selector for unlocking the I/O thread
+ this.wakeupWorker();
+ }
+
+ /**
+ * The worker processing incoming session creation, session destruction
+ * requests, session write and reads. It will also bind new servers.
+ */
+ private class SelectorWorker extends Thread {
+ // map for finding the keys associated with a given server
+ private final Map<ServerSocketChannel, SelectionKey> serverKey = new HashMap<ServerSocketChannel, SelectionKey>();
+
+ // map for finding read keys associated with a given session
+ private final Map<NioTcpSession, SelectionKey> sessionReadKey = new HashMap<NioTcpSession, SelectionKey>();
+
+ @Override
+ public void run() {
+ try {
+ if (NioSelectorProcessor.this.selector == null) {
+ LOGGER.debug("opening a new selector");
+
+ try {
+ NioSelectorProcessor.this.selector = Selector.open();
+ } catch (IOException e) {
+ LOGGER.error(
+ "IOException while opening a new Selector", e);
+ }
+ }
+
+ for (;;) {
+ try {
+ // pop server sockets for removing
+ if (NioSelectorProcessor.this.serversToRemove.size() > 0) {
+ this.processServerRemove();
+ }
+
+ // pop new server sockets for accepting
+ if (NioSelectorProcessor.this.serversToAdd.size() > 0) {
+ this.processServerAdd();
+ }
+
+ // pop new session for starting read/write
+ if (NioSelectorProcessor.this.sessionsToConnect.size() > 0) {
+ this.processConnectSessions();
+ }
+
+ // pop session for close, if any
+ if (NioSelectorProcessor.this.sessionsToClose.size() > 0) {
+ this.processCloseSessions();
+ }
+
+ LOGGER.debug("selecting...");
+ int readyCount = NioSelectorProcessor.this.selector
+ .select(SELECT_TIMEOUT);
+ LOGGER.debug("... done selecting : {}", readyCount);
+
+ if (readyCount > 0) {
+ // process selected keys
+ Iterator<SelectionKey> selectedKeys = NioSelectorProcessor.this.selector
+ .selectedKeys().iterator();
+
+ // Loop on each SelectionKey and process any valid
+ // action
+ while (selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ if (!key.isValid()) {
+ continue;
+ }
+
+ NioSelectorProcessor.this.selector
+ .selectedKeys().remove(key);
+
+ if (key.isAcceptable()) {
+ this.processAccept(key);
+ }
+
+ if (key.isReadable()) {
+ this.processRead(key);
+ }
+
+ if (key.isWritable()) {
+ this.processWrite(key);
+ }
+
+ }
+ }
+
+ // registering session with data in the write queue for
+ // writing
+ while (!NioSelectorProcessor.this.flushingSessions
+ .isEmpty()) {
+ this.processFlushSessions();
+ }
+ } catch (IOException e) {
+ LOGGER.error("IOException while selecting selector", e);
+ }
+
+ // stop the worker if needed
+ NioSelectorProcessor.this.workerLock.lock();
+
+ try {
+ if (NioSelectorProcessor.this.selector.keys().isEmpty()) {
+ NioSelectorProcessor.this.worker = null;
+ break;
+ }
+ } finally {
+ NioSelectorProcessor.this.workerLock.unlock();
+ }
+
+ // check for idle events
+ NioSelectorProcessor.this.idleChecker
+ .processIdleSession(System.currentTimeMillis());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception : ", e);
+ }
+ }
+
+ /**
+ * Handles the servers removal
+ */
+ private void processServerRemove() {
+ while (!NioSelectorProcessor.this.serversToRemove.isEmpty()) {
+ ServerSocketChannel channel = NioSelectorProcessor.this.serversToRemove
+ .poll();
+ SelectionKey key = this.serverKey.remove(channel);
+
+ if (key == null) {
+ LOGGER.error("The server socket was already removed of the selector");
+ } else {
+ LOGGER.debug("Removing the server from this selector : {}",
+ key);
+ key.cancel();
+ }
+ }
+ }
+
+ /**
+ * Handles the servers addition
+ */
+ private void processServerAdd() throws IOException {
+ while (!NioSelectorProcessor.this.serversToAdd.isEmpty()) {
+ Object[] tmp = NioSelectorProcessor.this.serversToAdd.poll();
+ ServerSocketChannel channel = (ServerSocketChannel) tmp[0];
+ SelectionKey key = channel.register(
+ NioSelectorProcessor.this.selector,
+ SelectionKey.OP_ACCEPT);
+ key.attach(tmp);
+ LOGGER.debug("Accepted the server on this selector : {}", key);
+ }
+ }
+
+ /**
+ * Handles all the sessions that must be connected
+ */
+ private void processConnectSessions() throws IOException {
+ while (!NioSelectorProcessor.this.sessionsToConnect.isEmpty()) {
+ NioTcpSession session = NioSelectorProcessor.this.sessionsToConnect
+ .poll();
+ SelectionKey key = session.getSocketChannel().register(
+ NioSelectorProcessor.this.selector,
+ SelectionKey.OP_READ);
+ key.attach(session);
+ this.sessionReadKey.put(session, key);
+
+ // Switch to CONNECTED, only if the session is not secured, as
+ // the SSL Handshake
+ // will occur later.
+ if (!session.isSecured()) {
+ session.setConnected();
+
+ // fire the event
+ ((AbstractIoService) session.getService())
+ .fireSessionCreated(session);
+ session.processSessionOpened();
+ long time = System.currentTimeMillis();
+ NioSelectorProcessor.this.idleChecker.sessionRead(session,
+ time);
+ NioSelectorProcessor.this.idleChecker.sessionWritten(
+ session, time);
+ }
+ }
+ }
+
+ /**
+ * Handles all the sessions that must be closed
+ */
+ private void processCloseSessions() throws IOException {
+ while (!NioSelectorProcessor.this.sessionsToClose.isEmpty()) {
+ NioTcpSession session = NioSelectorProcessor.this.sessionsToClose
+ .poll();
+
+ SelectionKey key = this.sessionReadKey.remove(session);
+ key.cancel();
+
+ // closing underlying socket
+ session.getSocketChannel().close();
+ // fire the event
+ session.processSessionClosed();
+ ((AbstractIoService) session.getService())
+ .fireSessionDestroyed(session);
+ }
+ }
+
+ /**
+ * Processes the Accept action for the given SelectionKey
+ */
+ private void processAccept(final SelectionKey key) throws IOException {
+ LOGGER.debug("acceptable new client {}", key);
+ ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key
+ .attachment())[0];
+ IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
+ // accepted connection
+ SocketChannel newClientChannel = serverSocket.accept();
+ LOGGER.debug("client accepted");
+ // and give it's to the strategy
+ NioSelectorProcessor.this.strategy.getSelectorForNewSession(
+ NioSelectorProcessor.this).createSession(server,
+ newClientChannel);
+ }
+
+ /**
+ * Processes the Read action for the given SelectionKey
+ */
+ private void processRead(final SelectionKey key) throws IOException {
+ LOGGER.debug("readable client {}", key);
+ NioTcpSession session = (NioTcpSession) key.attachment();
+ SocketChannel channel = session.getSocketChannel();
+ NioSelectorProcessor.this.readBuffer.clear();
+ int readCount = channel.read(NioSelectorProcessor.this.readBuffer);
+
+ LOGGER.debug("read {} bytes", readCount);
+
+ if (readCount < 0) {
+ // session closed by the remote peer
+ LOGGER.debug("session closed by the remote peer");
+ NioSelectorProcessor.this.sessionsToClose.add(session);
+ } else {
+ // we have read some data
+ // limit at the current position & rewind buffer back to start &
+ // push to the chain
+ NioSelectorProcessor.this.readBuffer.flip();
+
+ if (session.isSecured()) {
+ // We are reading data over a SSL/TLS encrypted connection.
+ // Redirect
+ // the processing to the SslHelper class.
+ SslHelper sslHelper = session
+ .getAttribute(SSL_HELPER, null);
+
+ if (sslHelper == null) {
+ throw new IllegalStateException();
+ }
+
+ sslHelper.processRead(session,
+ NioSelectorProcessor.this.readBuffer);
+ } else {
+ // Plain message, not encrypted : go directly to the chain
+ session.processMessageReceived(NioSelectorProcessor.this.readBuffer);
+ }
+
+ NioSelectorProcessor.this.idleChecker.sessionRead(session,
+ System.currentTimeMillis());
+ }
+ }
+
+ /**
+ * Processes the Write action for the given SelectionKey
+ */
+ private void processWrite(final SelectionKey key) throws IOException {
+ NioTcpSession session = (NioTcpSession) key.attachment();
+
+ LOGGER.debug("writable session : {}", session);
+
+ session.setNotRegisteredForWrite();
+
+ // write from the session write queue
+ boolean isEmpty = false;
+
+ try {
+ Queue<WriteRequest> queue = session.acquireWriteQueue();
+
+ do {
+ // get a write request from the queue
+ WriteRequest wreq = queue.peek();
+
+ if (wreq == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+
+ // Note that if the connection is secured, the buffer
+ // already
+ // contains encrypted data.
+ int wrote = session.getSocketChannel().write(buf);
+ session.incrementWrittenBytes(wrote);
+ LOGGER.debug("wrote {} bytes to {}", wrote, session);
+
+ NioSelectorProcessor.this.idleChecker.sessionWritten(
+ session, System.currentTimeMillis());
+
+ if (buf.remaining() == 0) {
+ // completed write request, let's remove it
+ queue.remove();
+ // complete the future
+ DefaultWriteFuture future = (DefaultWriteFuture) wreq
+ .getFuture();
+
+ if (future != null) {
+ future.complete();
+ }
+ } else {
+ // output socket buffer is full, we need
+ // to give up until next selection for
+ // writing
+ break;
+ }
+ } while (!queue.isEmpty());
+
+ isEmpty = queue.isEmpty();
+ } finally {
+ session.releaseWriteQueue();
+ }
+
+ // if the session is no more interested in writing, we need
+ // to stop listening for OP_WRITE events
+ if (isEmpty) {
+ if (session.isClosing()) {
+ LOGGER.debug(
+ "closing session {} have empty write queue, so we close it",
+ session);
+ // we was flushing writes, now we to the close
+ session.getSocketChannel().close();
+ } else {
+ // a key registered for read ? (because we can have a
+ // Selector for reads and another for the writes
+ SelectionKey readKey = this.sessionReadKey.get(session);
+
+ if (readKey != null) {
+ LOGGER.debug("registering key for only reading");
+ SelectionKey mykey = session.getSocketChannel()
+ .register(NioSelectorProcessor.this.selector,
+ SelectionKey.OP_READ, session);
+ this.sessionReadKey.put(session, mykey);
+ } else {
+ LOGGER.debug("cancel key for writing");
+ session.getSocketChannel()
+ .keyFor(NioSelectorProcessor.this.selector)
+ .cancel();
+ }
+ }
+ }
+ }
+
+ /**
+ * Flushes the sessions
+ */
+ private void processFlushSessions() throws IOException {
+ NioTcpSession session = NioSelectorProcessor.this.flushingSessions
+ .poll();
+ // a key registered for read ? (because we can have a
+ // Selector for reads and another for the writes
+ SelectionKey readKey = this.sessionReadKey.get(session);
+
+ if (readKey != null) {
+ // register for read/write
+ SelectionKey key = session.getSocketChannel().register(
+ NioSelectorProcessor.this.selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE, session);
+
+ this.sessionReadKey.put(session, key);
+ } else {
+ session.getSocketChannel().register(
+ NioSelectorProcessor.this.selector,
+ SelectionKey.OP_WRITE, session);
+ }
+ }
+ }
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java Tue May 22 15:02:28 2012
@@ -21,8 +21,9 @@ package org.apache.mina.transport.tcp.ni
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.mina.service.SelectorStrategy;
@@ -42,16 +43,17 @@ public class NioTcpServer extends Abstra
static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
// list of bound addresses
- private final Set<SocketAddress> addresses = Collections.synchronizedSet(new HashSet<SocketAddress>());
-
+ private final Map<SocketAddress /* bound address */,NioSelectorProcessor /* used processor */> addresses = new HashMap<SocketAddress, NioSelectorProcessor>();
+
// the strategy for dispatching servers and client to selector threads.
- private final SelectorStrategy strategy;
+ private final SelectorStrategy<NioSelectorProcessor> strategy;
+ // the default session confinguration
private TcpSessionConfig config;
private boolean reuseAddress = false;
- public NioTcpServer(final SelectorStrategy strategy) {
+ public NioTcpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
super();
this.strategy = strategy;
this.config = new DefaultTcpSessionConfig();
@@ -89,24 +91,26 @@ public class NioTcpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public void bind(final SocketAddress... localAddress) throws IOException {
+ public synchronized void bind(final SocketAddress... localAddress) throws IOException {
if (localAddress == null) {
// We should at least have one address to bind on
- throw new IllegalStateException("LocalAdress cannot be null");
+ throw new IllegalArgumentException("LocalAdress cannot be null");
}
for (SocketAddress address : localAddress) {
// check if the address is already bound
synchronized (this) {
- if (this.addresses.contains(address)) {
+ if (this.addresses.containsKey(address)) {
throw new IOException("address " + address + " already bound");
}
- LOG.debug("binding address {}", address);
-
- this.addresses.add(address);
- NioSelectorProcessor processor = (NioSelectorProcessor) this.strategy.getSelectorForBindNewAddress();
- processor.bindAndAcceptAddress(this, address);
+ LOG.info("binding address {}", address);
+ NioSelectorProcessor processor = this.strategy.getSelectorForBindNewAddress();
+
+ this.addresses.put(address,processor);
+
+ processor.bindTcpServer(this, address);
+
if (this.addresses.size() == 1) {
// it's the first address bound, let's fire the event
this.fireServiceActivated();
@@ -119,23 +123,21 @@ public class NioTcpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public Set<SocketAddress> getLocalAddresses() {
- return this.addresses;
+ public synchronized Set<SocketAddress> getLocalAddresses() {
+ return new HashSet<SocketAddress>(addresses.keySet());
}
/**
* {@inheritDoc}
*/
@Override
- public void unbind(final SocketAddress... localAddresses) throws IOException {
+ public synchronized void unbind(final SocketAddress... localAddresses) throws IOException {
for (SocketAddress socketAddress : localAddresses) {
- LOG.debug("unbinding {}", socketAddress);
- synchronized (this) {
- this.strategy.unbind(socketAddress);
- this.addresses.remove(socketAddress);
- if (this.addresses.isEmpty()) {
- this.fireServiceInactivated();
- }
+ LOG.info("unbinding {}", socketAddress);
+ addresses.get(socketAddress).unbind(socketAddress);
+ this.addresses.remove(socketAddress);
+ if (this.addresses.isEmpty()) {
+ this.fireServiceInactivated();
}
}
}
@@ -144,10 +146,12 @@ public class NioTcpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public void unbindAll() throws IOException {
- for (SocketAddress socketAddress : this.addresses) {
- this.unbind(socketAddress);
- }
+ public synchronized void unbindAll() throws IOException {
+ LOG.info("unbinding all");
+ for(SocketAddress address : addresses.keySet()) {
+ LOG.debug("unbinding {}", address);
+ addresses.remove(address).unbind(address);
+ }
}
}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java Tue May 22 15:02:28 2012
@@ -19,10 +19,6 @@
*/
package org.apache.mina.transport.udp;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Set;
-
import javax.net.ssl.SSLException;
import org.apache.mina.api.IoSession;
@@ -41,34 +37,22 @@ public abstract class AbstractUdpServer
super();
}
- @Override
- public Set<SocketAddress> getLocalAddresses() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void bind(SocketAddress... localAddress) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void unbindAll() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void unbind(SocketAddress... localAddresses) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
/**
* {@inheritDoc}
*/
public void initSecured(IoSession session) throws SSLException {
- // Do nothing : UDP does not support SSL
+ throw new RuntimeException("SSL is not supported for UDP");
}
+
+ /**
+ * Set the reuse address flag on the server socket
+ * @param reuseAddress <code>true</code> to enable
+ */
+ public abstract void setReuseAddress(boolean reuseAddress);
+
+ /**
+ * Is the reuse address enabled for this server.
+ * @return
+ */
+ public abstract boolean isReuseAddress();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java Tue May 22 15:02:28 2012
@@ -19,6 +19,7 @@
*/
package org.apache.mina.transport.udp.nio;
+import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.transport.tcp.NioSelectorProcessor;
import org.apache.mina.transport.udp.AbstractUdpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +46,8 @@ public class NioUdpServer extends Abstra
// the strategy for dispatching servers and client to selector threads.
private final SelectorStrategy strategy;
+
+ private boolean reuseAddress = false;
/**
* Create a new instance of NioUdpServer
@@ -61,4 +65,77 @@ public class NioUdpServer extends Abstra
// TODO Auto-generated method stub
return null;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<SocketAddress> getLocalAddresses() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bind(final SocketAddress... localAddress) throws IOException {
+ if (localAddress == null) {
+ // We should at least have one address to bind on
+ throw new IllegalArgumentException("LocalAdress cannot be null");
+ }
+
+ for (SocketAddress address : localAddress) {
+ // check if the address is already bound
+ synchronized (this) {
+ if (this.addresses.contains(address)) {
+ throw new IOException("address " + address + " already bound");
+ }
+
+ LOG.debug("binding address {}", address);
+
+ this.addresses.add(address);
+ NioSelectorProcessor processor = (NioSelectorProcessor) this.strategy.getSelectorForBindNewAddress();
+ processor.bindUdpServer(this, address);
+ if (this.addresses.size() == 1) {
+ // it's the first address bound, let's fire the event
+ this.fireServiceActivated();
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unbindAll() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unbind(SocketAddress... localAddresses) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setReuseAddress(boolean reuseAddress) {
+ this.reuseAddress = reuseAddress;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isReuseAddress() {
+ return this.reuseAddress;
+ }
}
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java Tue May 22 15:02:28 2012
@@ -34,7 +34,6 @@ import org.apache.mina.filter.logging.Lo
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
import org.apache.mina.transport.tcp.NioSelectorProcessor;
import org.apache.mina.transport.tcp.nio.NioTcpServer;
import org.slf4j.Logger;
@@ -53,8 +52,8 @@ public class NioEchoServer {
public static void main(String[] args) {
LOG.info("starting echo server");
- OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
- NioSelectorProcessor.class));
+ OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
+
NioTcpServer acceptor = new NioTcpServer(strategy);
// create the fitler chain for this service
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java Tue May 22 15:02:28 2012
@@ -39,7 +39,6 @@ import org.apache.mina.http.api.HttpRequ
import org.apache.mina.http.api.HttpStatus;
import org.apache.mina.http.api.HttpVersion;
import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
import org.apache.mina.transport.tcp.NioSelectorProcessor;
import org.apache.mina.transport.tcp.nio.NioTcpServer;
import org.slf4j.Logger;
@@ -51,8 +50,8 @@ public class HttpTest {
public static void main(String[] args) throws Exception {
- OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
- NioSelectorProcessor.class));
+ OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
+
NioTcpServer acceptor = new NioTcpServer(strategy);
acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
new DummyHttpSever());
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java Tue May 22 15:02:28 2012
@@ -39,7 +39,6 @@ import org.apache.mina.http.api.HttpRequ
import org.apache.mina.http.api.HttpStatus;
import org.apache.mina.http.api.HttpVersion;
import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
import org.apache.mina.transport.tcp.NioSelectorProcessor;
import org.apache.mina.transport.tcp.nio.NioTcpServer;
import org.slf4j.Logger;
@@ -51,8 +50,7 @@ public class HttpsTest {
public static void main(String[] args) throws Exception {
- OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
- NioSelectorProcessor.class));
+ OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
NioTcpServer acceptor = new NioTcpServer(strategy);
acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java Tue May 22 15:02:28 2012
@@ -40,7 +40,6 @@ import org.apache.mina.filter.logging.Lo
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.ldap.LdapCodec;
import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
import org.apache.mina.transport.tcp.NioSelectorProcessor;
import org.apache.mina.transport.tcp.nio.NioTcpServer;
import org.slf4j.Logger;
@@ -54,8 +53,7 @@ public class LdapTest {
public static void main(String[] args) throws Exception {
LdapTest ldapServer = new LdapTest();
- OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
- NioSelectorProcessor.class));
+ OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
NioTcpServer acceptor = new NioTcpServer(strategy);
acceptor.setFilters(new LoggingFilter("INCOMING"), new LdapCodec(), new LoggingFilter("DECODED"),
ldapServer.new DummyLdapSever());
@@ -113,7 +111,7 @@ public class LdapTest {
*/
private void handle(IoSession session, BindRequest bindRequest) {
// Build a faked BindResponse
- BindResponse response = (BindResponse) bindRequest.getResultResponse();
+ BindResponse response = bindRequest.getResultResponse();
response.getLdapResult().setResultCode(ResultCodeEnum.SUCCESS);
session.write(response);