You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/09/30 17:35:06 UTC
svn commit: r1392037 - in /mina/trunk:
core/src/main/java/org/apache/mina/session/
core/src/main/java/org/apache/mina/transport/nio/
core/src/test/java/org/apache/mina/service/idlecheker/
core/src/test/java/org/apache/mina/session/ examples/src/main/ja...
Author: jvermillard
Date: Sun Sep 30 15:35:05 2012
New Revision: 1392037
URL: http://svn.apache.org/viewvc?rev=1392037&view=rev
Log:
major rework of NioSelector
Added:
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (with props)
Removed:
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorEventListener.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Sun Sep 30 15:35:05 2012
@@ -37,10 +37,12 @@ import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
+import org.apache.mina.api.RuntimeIoException;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.util.AbstractIoFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,9 +70,6 @@ public abstract class AbstractIoSession
/** attributes map */
private final AttributeContainer attributes = new DefaultAttributeContainer();
- /** The {@link SelectorProcessor} used for handling this session writing */
- protected SelectorProcessor writeProcessor;
-
/** the {@link IdleChecker} in charge of detecting idle event for this session */
protected final IdleChecker idleChecker;
@@ -149,14 +148,13 @@ public abstract class AbstractIoSession
* {@link org.apache.mina.api.IoSession#getId()}) and an associated {@link IoService}
*
* @param service the service this session is associated with
- * @param writeProcessor the processor in charge of processing this session write queue
+ * @param selectorLoop the selector loop in charge of processing this session read/write events
*/
- public AbstractIoSession(IoService service, SelectorProcessor writeProcessor, IdleChecker idleChecker) {
+ public AbstractIoSession(IoService service, IdleChecker idleChecker) {
// generated a unique id
id = NEXT_ID.getAndIncrement();
creationTime = System.currentTimeMillis();
this.service = service;
- this.writeProcessor = writeProcessor;
this.chain = service.getFilters();
this.idleChecker = idleChecker;
@@ -498,6 +496,10 @@ public abstract class AbstractIoSession
return attributes.removeAttribute(key);
}
+ //----------------------------------------------------
+ // Write management
+ //----------------------------------------------------
+
/**
* {@inheritDoc}
*/
@@ -561,12 +563,14 @@ public abstract class AbstractIoSession
// If it wasn't, we register this session as interested to write.
// It's done in atomic fashion for avoiding two concurrent registering.
if (!registeredForWrite.getAndSet(true)) {
- writeProcessor.flush(this);
+ flushWriteQueue();
}
return request;
}
+ public abstract void flushWriteQueue();
+
public void setNotRegisteredForWrite() {
registeredForWrite.set(false);
}
@@ -589,6 +593,60 @@ public abstract class AbstractIoSession
}
//------------------------------------------------------------------------
+ // Close session management
+ //------------------------------------------------------------------------
+
+ /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
+ private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ // we don't cancel close
+ return false;
+ }
+ };
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public IoFuture<Void> close(boolean immediately) {
+ switch (state) {
+ case CREATED:
+ LOG.error("Session {} not opened", this);
+ throw new RuntimeIoException("cannot close an not opened session");
+ case CONNECTED:
+ state = SessionState.CLOSING;
+ if (immediately) {
+ channelClose();
+ } else {
+ // flush this session the flushing code will close the session
+ flushWriteQueue();
+ }
+ break;
+ case CLOSING:
+ // return the same future
+ LOG.warn("Already closing session {}", this);
+ break;
+ case CLOSED:
+ LOG.warn("Already closed session {}", this);
+ break;
+ default:
+ throw new RuntimeIoException("not implemented session state : " + state);
+ }
+
+ return closeFuture;
+ }
+
+ /**
+ * Close the inner socket channel
+ */
+ protected abstract void channelClose();
+
+ //------------------------------------------------------------------------
// Event processing using the filter chain
//------------------------------------------------------------------------
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+import org.apache.mina.api.RuntimeIoException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioSelectorLoop implements SelectorLoop {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorLoop.class);
+
+ /**
+ * A timeout used for the select, as we need to get out to deal with idle
+ * sessions
+ */
+ private static final long SELECT_TIMEOUT = 1000L;
+
+ /** the selector managed by this class */
+ private Selector selector;
+
+ /** the worker thread in charge of polling the selector */
+ private final SelectorWorker worker = new SelectorWorker();
+
+ /** the number of service using this selector */
+ private int serviceCount = 0;
+
+ /** Read buffer for all the incoming bytes (default to 64Kb) */
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
+
+ public NioSelectorLoop() {
+ try {
+ selector = Selector.open();
+ } catch (IOException ioe) {
+ LOGGER.error("Impossible to open a new NIO selector, O/S is out of file descriptor ?");
+ throw new RuntimeIoException(ioe);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void register(final boolean accept, final boolean read, final boolean write,
+ final SelectorListener listener, SelectableChannel channel) {
+ LOGGER.debug("adding to registration queue : {} for accept : {}, read : {}, write : {}", new Object[] {
+ listener, accept, read, write });
+ int ops = 0;
+ if (accept) {
+ ops |= SelectionKey.OP_ACCEPT;
+ }
+ if (read) {
+ ops |= SelectionKey.OP_READ;
+ }
+ if (write) {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ try {
+ channel.register(selector, ops, listener);
+ } catch (ClosedChannelException e) {
+ LOGGER.error("Trying to register an already closed channel : ", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
+ final SelectorListener listener, SelectableChannel channel) {
+ LOGGER.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
+ accept, read, write });
+
+ SelectionKey key = channel.keyFor(selector);
+ if (key == null) {
+ LOGGER.error("Trying to modify the registration of a not registered channel");
+ return;
+ }
+
+ int ops = 0;
+ if (accept) {
+ ops |= SelectionKey.OP_ACCEPT;
+ }
+ if (read) {
+ ops |= SelectionKey.OP_READ;
+ }
+ if (write) {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ key.interestOps(ops);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void unregister(final SelectorListener listener, SelectableChannel channel) {
+ LOGGER.debug("unregistering : {}", listener);
+ SelectionKey key = channel.keyFor(selector);
+ if (key == null) {
+ LOGGER.error("Trying to modify the registration of a not registered channel");
+ return;
+ }
+ key.cancel();
+ key.attach(null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void incrementServiceCount() {
+ serviceCount++;
+ LOGGER.debug("service count: {}", serviceCount);
+ if (serviceCount == 1) {
+ worker.start();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void decrementServiceCount() {
+ serviceCount--;
+ LOGGER.debug("service count: {}", serviceCount);
+ if (serviceCount < 0) {
+ LOGGER.error("service count should not be negative : bug ?");
+ }
+ }
+
+ /**
+ * The worker processing incoming session creation, session destruction
+ * requests, session write and reads. It will also bind new servers.
+ */
+ private class SelectorWorker extends Thread {
+
+ @Override
+ public void run() {
+ if (selector == null) {
+ LOGGER.debug("opening a new selector");
+
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ LOGGER.error("IOException while opening a new Selector", e);
+ }
+ }
+
+ for (;;) {
+ try {
+ LOGGER.debug("selecting...");
+ int readyCount = selector.select(SELECT_TIMEOUT);
+ LOGGER.debug("... done selecting : {}", readyCount);
+ if (readyCount > 0) {
+ for (SelectionKey key : selector.selectedKeys()) {
+ SelectorListener listener = (SelectorListener) key.attachment();
+ listener.ready(key.isAcceptable(), key.isReadable(), key.isReadable() ? readBuffer : null,
+ key.isWritable());
+ }
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception : ", e);
+ }
+
+ // stop the worker if needed (no more service)
+ synchronized (NioSelectorLoop.this) {
+ LOGGER.debug("remaing {} service", serviceCount);
+ if (serviceCount <= 0) {
+ LOGGER.debug("stop the worker");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Sun Sep 30 15:35:05 2012
@@ -26,8 +26,11 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +39,15 @@ import org.slf4j.LoggerFactory;
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class NioTcpServer extends AbstractTcpServer implements SelectorEventListener {
+public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
- // the strategy for dispatching servers and client to selector threads.
- private final SelectorStrategy<NioSelectorProcessor> strategy;
-
// the bound local address
private SocketAddress address = null;
- private NioSelectorProcessor acceptProcessor = null;
+ private final SelectorLoop acceptSelectorLoop;
+
+ private final SelectorLoop readWriteSelectorLoop;
// the key used for selecting accept event
private SelectionKey acceptKey = null;
@@ -53,9 +55,12 @@ public class NioTcpServer extends Abstra
// the server socket for accepting clients
private ServerSocketChannel serverChannel = null;
- public NioTcpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
+ private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+ public NioTcpServer(final SelectorLoop acceptSelectorLoop, SelectorLoop readWriteSelectorLoop) {
super();
- this.strategy = strategy;
+ this.acceptSelectorLoop = acceptSelectorLoop;
+ this.readWriteSelectorLoop = readWriteSelectorLoop;
}
/**
@@ -93,12 +98,13 @@ public class NioTcpServer extends Abstra
serverChannel.socket().bind(address);
serverChannel.configureBlocking(false);
- acceptProcessor = this.strategy.getSelectorForBindNewAddress();
-
- acceptProcessor.addServer(this);
+ acceptSelectorLoop.register(true, false, false, this, serverChannel);
// it's the first address bound, let's fire the event
this.fireServiceActivated();
+
+ // will start the selector processor if we are the first service
+ acceptSelectorLoop.incrementServiceCount();
}
/**
@@ -120,10 +126,13 @@ public class NioTcpServer extends Abstra
}
serverChannel.socket().close();
serverChannel.close();
- acceptProcessor.removeServer(this);
+ acceptSelectorLoop.unregister(this, serverChannel);
this.address = null;
this.fireServiceInactivated();
+
+ // will stop the acceptor processor if we are the last service
+ acceptSelectorLoop.decrementServiceCount();
}
/**
@@ -144,30 +153,99 @@ public class NioTcpServer extends Abstra
* {@inheritDoc}
*/
@Override
- public void acceptReady(NioSelectorProcessor processor) throws IOException {
- LOG.debug("acceptable new client");
+ public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+ if (accept) {
+ LOG.debug("acceptable new client");
+
+ // accepted connection
+ try {
+ LOG.debug("new client accepted");
+ createSession(getServerSocketChannel().accept());
+
+ } catch (IOException e) {
+ LOG.error("error while accepting new client", e);
+ }
+ }
+ if (read || write) {
+ throw new IllegalStateException("should not receive read or write events");
+ }
+ }
- // accepted connection
- SocketChannel newClientChannel = getServerSocketChannel().accept();
- LOG.debug("client accepted");
+ private void createSession(final SocketChannel clientSocket) throws IOException {
+ LOG.debug("create session");
+ final SocketChannel socketChannel = clientSocket;
+ final TcpSessionConfig config = getSessionConfig();
+ final NioTcpSession session = new NioTcpSession(this, socketChannel, readWriteSelectorLoop, idleChecker);
+
+ socketChannel.configureBlocking(false);
+
+ // apply idle configuration
+ session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+ session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+ config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
- // and give it's to the strategy
- strategy.getSelectorForNewSession(processor).createSession(this, newClientChannel);
- }
+ // apply the default service socket configuration
+ Boolean keepAlive = config.isKeepAlive();
- /**
- * {@inheritDoc}
- */
- @Override
- public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) {
- throw new IllegalStateException("read event should never occur on NioTcpServer");
- }
+ if (keepAlive != null) {
+ session.getConfig().setKeepAlive(keepAlive);
+ }
- /**
- * {@inheritDoc}
- */
- @Override
- public void writeReady(NioSelectorProcessor processor) {
- throw new IllegalStateException("write event should never occur on NioTcpServer");
+ Boolean oobInline = config.isOobInline();
+
+ if (oobInline != null) {
+ session.getConfig().setOobInline(oobInline);
+ }
+
+ Boolean reuseAddress = config.isReuseAddress();
+
+ if (reuseAddress != null) {
+ session.getConfig().setReuseAddress(reuseAddress);
+ }
+
+ Boolean tcpNoDelay = config.isTcpNoDelay();
+
+ if (tcpNoDelay != null) {
+ session.getConfig().setTcpNoDelay(tcpNoDelay);
+ }
+
+ Integer receiveBufferSize = config.getReceiveBufferSize();
+
+ if (receiveBufferSize != null) {
+ session.getConfig().setReceiveBufferSize(receiveBufferSize);
+ }
+
+ Integer sendBufferSize = config.getSendBufferSize();
+
+ if (sendBufferSize != null) {
+ session.getConfig().setSendBufferSize(sendBufferSize);
+ }
+
+ Integer trafficClass = config.getTrafficClass();
+
+ if (trafficClass != null) {
+ session.getConfig().setTrafficClass(trafficClass);
+ }
+
+ Integer soLinger = config.getSoLinger();
+
+ if (soLinger != null) {
+ session.getConfig().setSoLinger(soLinger);
+ }
+
+ // Set the secured flag if the service is to be used over SSL/TLS
+ if (config.isSecured()) {
+ session.initSecure(config.getSslContext());
+ }
+
+ // event session created
+ session.processSessionCreated();
+
+ // add the session to the queue for being added to the selector
+ readWriteSelectorLoop.register(false, true, false, session, socketChannel);
+ readWriteSelectorLoop.incrementServiceCount();
+ session.processSessionOpened();
+ session.setConnected();
}
+
}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Sun Sep 30 15:35:05 2012
@@ -26,16 +26,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Queue;
-import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
-import org.apache.mina.api.RuntimeIoException;
import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
import org.apache.mina.session.DefaultWriteFuture;
import org.apache.mina.session.SslHelper;
import org.apache.mina.session.WriteRequest;
import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
import org.apache.mina.transport.tcp.TcpSessionConfig;
-import org.apache.mina.util.AbstractIoFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,32 +44,23 @@ import org.slf4j.LoggerFactory;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
-public class NioTcpSession extends AbstractNioSession implements SelectorEventListener {
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
/** the NIO socket channel for this TCP session */
private final SocketChannel channel;
+ /** the selector loop in charge of generating read/write events for this session */
+ private final SelectorLoop selectorLoop;
+
/** the socket configuration */
private final TcpSessionConfig configuration;
- /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
- private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- protected boolean cancelOwner(boolean mayInterruptIfRunning) {
- // we don't cancel close
- return false;
- }
- };
-
- NioTcpSession(IoService service, SocketChannel channel, NioSelectorProcessor writeProcessor, IdleChecker idleChecker) {
- super(service, writeProcessor, idleChecker);
+ NioTcpSession(IoService service, SocketChannel channel, SelectorLoop selectorLoop, IdleChecker idleChecker) {
+ super(service, idleChecker);
this.channel = channel;
+ this.selectorLoop = selectorLoop;
this.configuration = new ProxyTcpSessionConfig(channel.socket());
}
@@ -122,42 +111,6 @@ public class NioTcpSession extends Abstr
* {@inheritDoc}
*/
@Override
- public IoFuture<Void> close(boolean immediately) {
- switch (state) {
- case CREATED:
- LOG.error("Session {} not opened", this);
- throw new RuntimeIoException("cannot close an not opened session");
- case CONNECTED:
- state = SessionState.CLOSING;
- if (immediately) {
- try {
- channel.close();
- } catch (IOException e) {
- throw new RuntimeIoException(e);
- }
- } else {
- // flush this session the flushing code will close the session
- writeProcessor.flush(this);
- }
- break;
- case CLOSING:
- // return the same future
- LOG.warn("Already closing session {}", this);
- break;
- case CLOSED:
- LOG.warn("Already closed session {}", this);
- break;
- default:
- throw new RuntimeIoException("not implemented session state : " + state);
- }
-
- return closeFuture;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public void suspendRead() {
// TODO
throw new RuntimeException("Not implemented");
@@ -196,7 +149,7 @@ public class NioTcpSession extends Abstr
@Override
public boolean isReadSuspended() {
// TODO
- throw new RuntimeException("Not implemented");
+ return false;
}
/**
@@ -231,120 +184,144 @@ public class NioTcpSession extends Abstr
* {@inheritDoc}
*/
@Override
- public void acceptReady(NioSelectorProcessor processor) {
- // should never happen
- throw new IllegalStateException("accept event should never occur on NioTcpSession");
+ protected void channelClose() {
+ try {
+ selectorLoop.unregister(this, channel);
+ selectorLoop.decrementServiceCount();
+ channel.close();
+ } catch (IOException e) {
+ LOG.error("Exception while closing the channel : ", e);
+ }
}
/**
* {@inheritDoc}
*/
@Override
- public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) throws IOException {
- LOG.debug("readable session : {}", this);
- readBuffer.clear();
- int readCount = channel.read(readBuffer);
-
- LOG.debug("read {} bytes", readCount);
-
- if (readCount < 0) {
- // session closed by the remote peer
- LOG.debug("session closed by the remote peer");
- processor.addSessionToClose(this);
- } else {
- // we have read some data
- // limit at the current position & rewind buffer back to start &
- // push to the chain
- readBuffer.flip();
-
- if (isSecured()) {
- // We are reading data over a SSL/TLS encrypted connection.
- // Redirect
- // the processing to the SslHelper class.
- SslHelper sslHelper = getAttribute(SSL_HELPER, null);
-
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
-
- sslHelper.processRead(this, readBuffer);
- } else {
- // Plain message, not encrypted : go directly to the chain
- processMessageReceived(readBuffer);
- }
-
- idleChecker.sessionRead(this, System.currentTimeMillis());
- }
-
+ public void flushWriteQueue() {
+ // register for write
+ selectorLoop.modifyRegistration(false, !isReadSuspended(), true, this, channel);
}
/**
* {@inheritDoc}
*/
@Override
- public void writeReady(NioSelectorProcessor processor) throws IOException {
- LOG.debug("writable session : {}", this);
+ public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+ if (read) {
+ try {
- setNotRegisteredForWrite();
+ LOG.debug("readable session : {}", this);
+ readBuffer.clear();
+ int readCount = channel.read(readBuffer);
- // write from the session write queue
- boolean isEmpty = false;
+ LOG.debug("read {} bytes", readCount);
- try {
- Queue<WriteRequest> queue = acquireWriteQueue();
-
- do {
- // get a write request from the queue
- WriteRequest wreq = queue.peek();
+ if (readCount < 0) {
+ // session closed by the remote peer
+ LOG.debug("session closed by the remote peer");
+ close(true);
+ } else {
+ // we have read some data
+ // limit at the current position & rewind buffer back to start &
+ // push to the chain
+ readBuffer.flip();
+
+ if (isSecured()) {
+ // We are reading data over a SSL/TLS encrypted connection.
+ // Redirect
+ // the processing to the SslHelper class.
+ SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+
+ if (sslHelper == null) {
+ throw new IllegalStateException();
+ }
+
+ sslHelper.processRead(this, readBuffer);
+ } else {
+ // Plain message, not encrypted : go directly to the chain
+ processMessageReceived(readBuffer);
+ }
- if (wreq == null) {
- break;
+ idleChecker.sessionRead(this, System.currentTimeMillis());
}
+ } catch (IOException e) {
+ LOG.error("Exception while reading : ", e);
+ }
- ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+ }
+ if (write) {
+ try {
+ LOG.debug("ready for write");
+ LOG.debug("writable session : {}", this);
- // Note that if the connection is secured, the buffer
- // already
- // contains encrypted data.
- int wrote = getSocketChannel().write(buf);
- incrementWrittenBytes(wrote);
- LOG.debug("wrote {} bytes to {}", wrote, this);
-
- idleChecker.sessionWritten(this, System.currentTimeMillis());
-
- if (buf.remaining() == 0) {
- // completed write request, let's remove it
- queue.remove();
- // complete the future
- DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+ setNotRegisteredForWrite();
- if (future != null) {
- future.complete();
- }
- } else {
- // output socket buffer is full, we need
- // to give up until next selection for
- // writing
- break;
- }
- } while (!queue.isEmpty());
+ // write from the session write queue
+ boolean isEmpty = false;
- isEmpty = queue.isEmpty();
- } finally {
- this.releaseWriteQueue();
- }
+ try {
+ Queue<WriteRequest> queue = acquireWriteQueue();
- // if the session is no more interested in writing, we need
- // to stop listening for OP_WRITE events
- if (isEmpty) {
- if (isClosing()) {
- LOG.debug("closing session {} have empty write queue, so we close it", this);
- // we was flushing writes, now we to the close
- getSocketChannel().close();
- } else {
- // no more write event needed
- processor.cancelKeyForWritting(this);
+ do {
+ // get a write request from the queue
+ WriteRequest wreq = queue.peek();
+
+ if (wreq == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+
+ // Note that if the connection is secured, the buffer
+ // already
+ // contains encrypted data.
+ int wrote = getSocketChannel().write(buf);
+ incrementWrittenBytes(wrote);
+ LOG.debug("wrote {} bytes to {}", wrote, this);
+
+ idleChecker.sessionWritten(this, System.currentTimeMillis());
+
+ if (buf.remaining() == 0) {
+ // completed write request, let's remove it
+ queue.remove();
+ // complete the future
+ DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+
+ if (future != null) {
+ future.complete();
+ }
+ } else {
+ // output socket buffer is full, we need
+ // to give up until next selection for
+ // writing
+ break;
+ }
+ } while (!queue.isEmpty());
+
+ isEmpty = queue.isEmpty();
+ } finally {
+ this.releaseWriteQueue();
+ }
+
+ // if the session is no more interested in writing, we need
+ // to stop listening for OP_WRITE events
+ if (isEmpty) {
+ if (isClosing()) {
+ LOG.debug("closing session {} have empty write queue, so we close it", this);
+ // we was flushing writes, now we to the close
+ channelClose();
+ } else {
+ // no more write event needed
+ selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while reading : ", e);
}
}
+ if (accept) {
+ throw new IllegalStateException("accept event should never occur on NioTcpSession");
+ }
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Sun Sep 30 15:35:05 2012
@@ -28,7 +28,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.mina.api.IoSessionConfig;
-import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.udp.AbstractUdpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,18 +39,18 @@ import org.slf4j.LoggerFactory;
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class NioUdpServer extends AbstractUdpServer implements SelectorEventListener {
+public class NioUdpServer extends AbstractUdpServer implements SelectorListener {
static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class);
// the bound local address
private SocketAddress address = null;
- // the strategy for dispatching servers and client to selector threads.
- private final SelectorStrategy<NioSelectorProcessor> strategy;
-
// the processor used for read and write this server
- private NioSelectorProcessor processor;
+ private final NioSelectorLoop selectorLoop;
+
+ // used for detecting idle sessions
+ private final IdleChecker idleChecker = new IndexedIdleChecker();
// the inner channel for read/write UDP datagrams
private DatagramChannel datagramChannel = null;
@@ -63,9 +64,9 @@ public class NioUdpServer extends Abstra
/**
* Create a new instance of NioUdpServer
*/
- public NioUdpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
+ public NioUdpServer(final NioSelectorLoop selectorLoop) {
super();
- this.strategy = strategy;
+ this.selectorLoop = selectorLoop;
}
/**
@@ -119,9 +120,8 @@ public class NioUdpServer extends Abstra
datagramChannel.socket().bind(address);
datagramChannel.configureBlocking(false);
- processor = this.strategy.getSelectorForBindNewAddress();
-
- processor.addServer(this);
+ selectorLoop.register(false, true, false, this, datagramChannel);
+ selectorLoop.incrementServiceCount();
// it's the first address bound, let's fire the event
this.fireServiceActivated();
@@ -136,11 +136,13 @@ public class NioUdpServer extends Abstra
if (this.address == null) {
throw new IllegalStateException("server not bound");
}
+
+ selectorLoop.unregister(this, datagramChannel);
+ selectorLoop.decrementServiceCount();
+
datagramChannel.socket().close();
datagramChannel.close();
- processor.removeServer(this);
-
this.address = null;
this.fireServiceInactivated();
}
@@ -160,44 +162,35 @@ public class NioUdpServer extends Abstra
}
/**
- * {@inheritDoc}
- */
- @Override
- public void acceptReady(NioSelectorProcessor processor) {
- throw new IllegalStateException("accept event should never occur on NioUdpServer");
- }
-
- /**
- * {@inheritDoc}
- */
+ * {@inheritDoc}
+ */
@Override
- public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) throws IOException {
- LOG.debug("readable datagram for UDP service : {}", this);
- readBuffer.clear();
-
- SocketAddress source = datagramChannel.receive(readBuffer);
- readBuffer.flip();
-
- LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
-
- // let's find the corresponding session
-
- NioUdpSession session = sessions.get(source);
- if (session == null) {
- session = new NioUdpSession(this, strategy.getSelectorForNewSession(processor), processor.getIdleChecker(),
- address, source);
+ public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+ if (read) {
+ try {
+ LOG.debug("readable datagram for UDP service : {}", this);
+ readBuffer.clear();
+
+ SocketAddress source = datagramChannel.receive(readBuffer);
+ readBuffer.flip();
+
+ LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
+
+ // let's find the corresponding session
+
+ NioUdpSession session = sessions.get(source);
+ if (session == null) {
+ session = new NioUdpSession(this, idleChecker, address, source);
+ }
+
+ session.receivedDatagram(readBuffer);
+ } catch (IOException ex) {
+ LOG.error("IOException while reading the socket", ex);
+ }
+ }
+ if (write) {
+ // TODO : flush session
}
-
- session.receivedDatagram(readBuffer);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void writeReady(NioSelectorProcessor processor) throws IOException {
- // TODO : flush the sessions
-
}
}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java Sun Sep 30 15:35:05 2012
@@ -27,6 +27,7 @@ import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.api.RuntimeIoException;
import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
import org.apache.mina.util.AbstractIoFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
* A UDP session based on NIO
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class NioUdpSession extends AbstractNioSession {
+public class NioUdpSession extends AbstractIoSession {
private static final Logger LOG = LoggerFactory.getLogger(NioUdpSession.class);
@@ -61,9 +62,9 @@ public class NioUdpSession extends Abstr
* @param writeProcessor
* @param idleChecker
*/
- public NioUdpSession(IoService service, NioSelectorProcessor writeProcessor, IdleChecker idleChecker,
- SocketAddress localAddress, SocketAddress remoteAddress) {
- super(service, writeProcessor, idleChecker);
+ public NioUdpSession(IoService service, IdleChecker idleChecker, SocketAddress localAddress,
+ SocketAddress remoteAddress) {
+ super(service, idleChecker);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
@@ -72,6 +73,22 @@ public class NioUdpSession extends Abstr
* {@inheritDoc}
*/
@Override
+ protected void channelClose() {
+ // No inner socket to close for UDP
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushWriteQueue() {
+ // TODO flush queue
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface SelectorListener {
+
+ void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write);
+
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface SelectorLoop {
+
+ public abstract void register(boolean accept, boolean read, boolean write, SelectorListener listener,
+ SelectableChannel channel);
+
+ public abstract void modifyRegistration(boolean accept, boolean read, boolean write, SelectorListener listener,
+ SelectableChannel channel);
+
+ public abstract void unregister(SelectorListener listener, SelectableChannel channel);
+
+ public abstract void incrementServiceCount();
+
+ public abstract void decrementServiceCount();
+
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java Sun Sep 30 15:35:05 2012
@@ -21,8 +21,8 @@
*/
package org.apache.mina.service.idlecheker;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
import java.net.SocketAddress;
@@ -30,7 +30,6 @@ import org.apache.mina.api.IdleStatus;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSessionConfig;
-import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.session.AbstractIoSession;
@@ -85,8 +84,6 @@ public class IndexedIdleChekerTest {
assertEquals(1, session.writeIdleCount);
}
- private final SelectorProcessor processor = mock(SelectorProcessor.class);
-
private class DummySession extends AbstractIoSession {
int readIdleCount = 0;
@@ -94,7 +91,7 @@ public class IndexedIdleChekerTest {
int writeIdleCount = 0;
private DummySession(IoService service, IdleChecker checker) {
- super(service, processor, checker);
+ super(service, checker);
}
@Override
@@ -179,5 +176,21 @@ public class IndexedIdleChekerTest {
// TODO Auto-generated method stub
return false;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void channelClose() {
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushWriteQueue() {
+
+ }
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java Sun Sep 30 15:35:05 2012
@@ -18,16 +18,9 @@
*/
package org.apache.mina.session;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -41,7 +34,6 @@ import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.SelectorProcessor;
import org.junit.Before;
import org.junit.Test;
@@ -52,11 +44,9 @@ import org.junit.Test;
*/
public class AbstractIoSessionTest {
- private final SelectorProcessor processor = mock(SelectorProcessor.class);
-
private class DummySession extends AbstractIoSession {
private DummySession(IoService service) {
- super(service, processor, null);
+ super(service, null);
}
@Override
@@ -122,9 +112,23 @@ public class AbstractIoSessionTest {
@Override
public boolean isClosed() {
- // TODO Auto-generated method stub
return false;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void channelClose() {
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushWriteQueue() {
+ }
}
private IoService service = null;
@@ -200,7 +204,6 @@ public class AbstractIoSessionTest {
verify(filter1).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
verify(filter2).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
verify(filter3).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
- verify(processor).flush(eq(session));
}
@Test
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java Sun Sep 30 15:35:05 2012
@@ -33,8 +33,7 @@ import org.apache.mina.api.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
import org.apache.mina.transport.nio.NioTcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +51,7 @@ public class NioEchoServer {
public static void main(String[] args) {
LOG.info("starting echo server");
- OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-
- NioTcpServer acceptor = new NioTcpServer(strategy);
+ NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
// create the fitler chain for this service
acceptor.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java Sun Sep 30 15:35:05 2012
@@ -38,8 +38,7 @@ import org.apache.mina.http.api.HttpMeth
import org.apache.mina.http.api.HttpRequest;
import org.apache.mina.http.api.HttpStatus;
import org.apache.mina.http.api.HttpVersion;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
import org.apache.mina.transport.nio.NioTcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +49,7 @@ public class HttpTest {
public static void main(String[] args) throws Exception {
- OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-
- NioTcpServer acceptor = new NioTcpServer(strategy);
+ NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
new DummyHttpSever());
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java Sun Sep 30 15:35:05 2012
@@ -38,8 +38,7 @@ import org.apache.mina.http.api.HttpMeth
import org.apache.mina.http.api.HttpRequest;
import org.apache.mina.http.api.HttpStatus;
import org.apache.mina.http.api.HttpVersion;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
import org.apache.mina.transport.nio.NioTcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +49,7 @@ public class HttpsTest {
public static void main(String[] args) throws Exception {
- OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
- NioTcpServer acceptor = new NioTcpServer(strategy);
+ NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
new DummyHttpSever());
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java Sun Sep 30 15:35:05 2012
@@ -39,8 +39,7 @@ import org.apache.mina.api.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.ldap.LdapCodec;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
import org.apache.mina.transport.nio.NioTcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +52,7 @@ public class LdapTest {
public static void main(String[] args) throws Exception {
LdapTest ldapServer = new LdapTest();
- OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
- NioTcpServer acceptor = new NioTcpServer(strategy);
+ NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
acceptor.setFilters(new LoggingFilter("INCOMING"), new LdapCodec(), new LoggingFilter("DECODED"),
ldapServer.new DummyLdapServer());
Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java Sun Sep 30 15:35:05 2012
@@ -32,8 +32,7 @@ import org.apache.mina.api.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
import org.apache.mina.transport.nio.NioUdpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +48,7 @@ public class NioUdpEchoServer {
public static void main(String[] args) {
LOG.info("starting echo server");
- OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(
- new NioSelectorProcessor());
-
- NioUdpServer server = new NioUdpServer(strategy);
+ NioUdpServer server = new NioUdpServer(new NioSelectorLoop());
// create the fitler chain for this service
server.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {