You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/16 16:16:17 UTC
svn commit: r576109 - in /mina/trunk/core/src/main/java/org/apache/mina:
common/ transport/socket/nio/
Author: trustin
Date: Sun Sep 16 07:16:16 2007
New Revision: 576109
URL: http://svn.apache.org/viewvc?rev=576109&view=rev
Log:
* Replaced SocketIoProcessor with NIOProcessor
* DatagramConnector now uses NIOProcessor (TODO: make it multi-threaded)
* Added NIOSession
Added:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java (with props)
Removed:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 07:16:16 2007
@@ -147,11 +147,11 @@
doAdd(session);
addedSessions ++;
- // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
} catch (Exception e) {
- // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
session.getFilterChain().fireExceptionCaught(session, e);
}
@@ -206,11 +206,12 @@
}
private void process(AbstractIoSession session) throws Exception {
- if (((readyOps(session) & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
+ int readyOps = readyOps(session);
+ if (((readyOps & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
read(session);
}
- if (((readyOps(session) & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
+ if (((readyOps & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
scheduleFlush(session);
}
}
@@ -224,8 +225,15 @@
int ret;
try {
- while ((ret = read(session, buf)) > 0) {
- readBytes += ret;
+ if (session.getTransportMetadata().hasFragmentation()) {
+ while ((ret = read(session, buf)) > 0) {
+ readBytes += ret;
+ }
+ } else {
+ ret = read(session, buf);
+ if (ret > 0) {
+ readBytes = ret;
+ }
}
} finally {
buf.flip();
@@ -237,16 +245,18 @@
session.getFilterChain().fireMessageReceived(session, buf);
buf = null;
- if (readBytes * 2 < config.getReadBufferSize()) {
- if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
- config.setReadBufferSize(config.getReadBufferSize() >>> 1);
- }
- } else if (readBytes == config.getReadBufferSize()) {
- int newReadBufferSize = config.getReadBufferSize() << 1;
- if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
- config.setReadBufferSize(newReadBufferSize);
- } else {
- config.setReadBufferSize(config.getMaxReadBufferSize());
+ if (!session.getTransportMetadata().hasFragmentation()) {
+ if (readBytes * 2 < config.getReadBufferSize()) {
+ if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
+ config.setReadBufferSize(config.getReadBufferSize() >>> 1);
+ }
+ } else if (readBytes == config.getReadBufferSize()) {
+ int newReadBufferSize = config.getReadBufferSize() << 1;
+ if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
+ config.setReadBufferSize(newReadBufferSize);
+ } else {
+ config.setReadBufferSize(config.getMaxReadBufferSize());
+ }
}
}
}
@@ -423,7 +433,7 @@
region.setPosition(region.getPosition() + localWrittenBytes);
writtenBytes += localWrittenBytes;
}
-
+
if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much.
interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Sun Sep 16 07:16:16 2007
@@ -22,27 +22,17 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoConnector;
-import org.apache.mina.common.DefaultIoFilterChain;
-import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.util.NamePreservingRunnable;
import org.apache.mina.util.NewThreadExecutor;
/**
@@ -54,23 +44,9 @@
public class DatagramConnector extends AbstractIoConnector {
private static volatile int nextId = 0;
- private final Executor executor;
-
private final int id = nextId++;
- private final Selector selector;
-
- private final IoProcessor processor = new DatagramConnectorProcessor();
-
- private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
-
- private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
- private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
- private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
- private Worker worker;
+ private final IoProcessor processor;
/**
* Creates a new instance.
@@ -84,24 +60,8 @@
*/
public DatagramConnector(Executor executor) {
super(new DefaultDatagramSessionConfig());
-
- try {
- this.selector = Selector.open();
- } catch (IOException e) {
- throw new RuntimeIOException("Failed to open a selector.", e);
- }
-
- this.executor = executor;
- }
-
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- try {
- selector.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
+
+ processor = new NIOProcessor("DatagramConnector-" + id, executor);
}
public TransportMetadata getTransportMetadata() {
@@ -118,6 +78,7 @@
SocketAddress localAddress) {
DatagramChannel ch = null;
boolean initialized = false;
+ IoSession session = null;
try {
ch = DatagramChannel.open();
DatagramSessionConfig cfg = getSessionConfig();
@@ -135,9 +96,17 @@
ch.socket().bind(localAddress);
}
ch.connect(remoteAddress);
- ch.configureBlocking(false);
+
+ session = new DatagramSessionImpl(this, ch, getHandler());
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ ConnectFuture future = new DefaultConnectFuture();
+ // DefaultIoFilterChain will notify the connect future.
+ session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
+
+ processor.add(session);
initialized = true;
- } catch (IOException e) {
+ return future;
+ } catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!initialized && ch != null) {
@@ -149,341 +118,9 @@
}
}
}
-
- RegistrationRequest request = new RegistrationRequest(ch);
-
- startupWorker();
- registerQueue.add(request);
- selector.wakeup();
-
- return request;
}
IoProcessor getProcessor() {
return processor;
- }
-
- private class DatagramConnectorProcessor implements IoProcessor {
- public void add(IoSession session) {
- }
-
- public void flush(IoSession session, WriteRequest writeRequest) {
- if (scheduleFlush((DatagramSessionImpl) session)) {
- Selector selector = DatagramConnector.this.selector;
- if (selector != null) {
- selector.wakeup();
- }
- }
- }
-
- public void remove(IoSession session) {
- startupWorker();
- cancelQueue.add((DatagramSessionImpl) session);
- selector.wakeup();
- }
-
- public void updateTrafficMask(IoSession session) {
- scheduleTrafficControl((DatagramSessionImpl) session);
- Selector selector = DatagramConnector.this.selector;
- if (selector != null) {
- selector.wakeup();
- }
- selector.wakeup();
- }
- }
-
- private synchronized void startupWorker() {
- if (worker == null) {
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker));
- }
- }
-
- private boolean scheduleFlush(DatagramSessionImpl session) {
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
- return true;
- } else {
- return false;
- }
- }
-
- private void scheduleTrafficControl(DatagramSessionImpl session) {
- trafficControllingSessions.add(session);
- }
-
- private void doUpdateTrafficMask() {
- for (; ;) {
- DatagramSessionImpl session = trafficControllingSessions.poll();
- if (session == null) {
- break;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if (key == null) {
- scheduleTrafficControl(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid()) {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
- int ops = SelectionKey.OP_READ;
- Queue<WriteRequest> writeRequestQueue = session
- .getWriteRequestQueue();
- synchronized (writeRequestQueue) {
- if (!writeRequestQueue.isEmpty()) {
- ops |= SelectionKey.OP_WRITE;
- }
- }
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- key.interestOps(ops & mask);
- }
- }
-
- private class Worker implements Runnable {
- public void run() {
- Thread.currentThread().setName("DatagramConnector-" + id);
-
- for (; ;) {
- try {
- int nKeys = selector.select();
-
- registerNew();
- doUpdateTrafficMask();
-
- if (nKeys > 0) {
- processReadySessions(selector.selectedKeys());
- }
-
- flushSessions();
- cancelKeys();
-
- if (selector.keys().isEmpty()) {
- synchronized (DatagramConnector.this) {
- if (selector.keys().isEmpty()
- && registerQueue.isEmpty()
- && cancelQueue.isEmpty()) {
- worker = null;
- break;
- }
- }
- }
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
- }
-
- private void processReadySessions(Set<SelectionKey> keys) {
- Iterator<SelectionKey> it = keys.iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- it.remove();
-
- DatagramSessionImpl session = (DatagramSessionImpl) key
- .attachment();
-
- if (key.isReadable() && session.getTrafficMask().isReadable()) {
- readSession(session);
- }
-
- if (key.isWritable() && session.getTrafficMask().isWritable()) {
- scheduleFlush(session);
- }
- }
- }
-
- private void readSession(DatagramSessionImpl session) {
-
- ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
- try {
- int readBytes = session.getChannel().read(readBuf.buf());
- if (readBytes > 0) {
- readBuf.flip();
- ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
- newBuf.put(readBuf);
- newBuf.flip();
-
- session.increaseReadBytes(readBytes);
- session.getFilterChain().fireMessageReceived(session, newBuf);
- }
- } catch (IOException e) {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
-
- private void flushSessions() {
- for (; ;) {
- DatagramSessionImpl session = flushingSessions.poll();
- if (session == null) {
- break;
- }
-
- session.setScheduledForFlush(false);
-
- try {
- boolean flushedAll = flush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (IOException e) {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
- }
-
- private boolean flush(DatagramSessionImpl session) throws IOException {
- // Clear OP_WRITE
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- return false;
- }
- if (!key.isValid()) {
- return false;
- }
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- DatagramChannel ch = session.getChannel();
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
- int writtenBytes = 0;
- int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
- try {
- for (; ;) {
- WriteRequest req;
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
- if (req == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
-
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- int localWrittenBytes = ch.write(buf.buf());
- if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- } else {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
-
- writtenBytes += localWrittenBytes;
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- }
- }
- } finally {
- session.increaseWrittenBytes(writtenBytes);
- }
-
- return true;
- }
-
- private void registerNew() {
- for (; ;) {
- RegistrationRequest req = registerQueue.poll();
- if (req == null) {
- break;
- }
-
- DatagramSessionImpl session = new DatagramSessionImpl(
- this, req.channel, getHandler());
-
- // AbstractIoFilterChain will notify the connect future.
- session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, req);
-
- boolean success = false;
- try {
- SelectionKey key = req.channel.register(selector,
- SelectionKey.OP_READ, session);
-
- session.setSelectionKey(key);
- buildFilterChain(session);
- // The CONNECT_FUTURE attribute is cleared and notified here.
- getListeners().fireSessionCreated(session);
- success = true;
- } catch (Throwable t) {
- // The CONNECT_FUTURE attribute is cleared and notified here.
- session.getFilterChain().fireExceptionCaught(session, t);
- } finally {
- if (!success) {
- try {
- req.channel.disconnect();
- req.channel.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
- }
-
- private void buildFilterChain(IoSession session) throws Exception {
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- }
-
- private void cancelKeys() {
- for (; ;) {
- DatagramSessionImpl session = cancelQueue.poll();
- if (session == null) {
- break;
- } else {
- SelectionKey key = session.getSelectionKey();
- DatagramChannel ch = (DatagramChannel) key.channel();
- try {
- ch.disconnect();
- ch.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
-
- getListeners().fireSessionDestroyed(session);
- key.cancel();
- selector.wakeup(); // wake up again to trigger thread death
- }
- }
- }
-
- private static class RegistrationRequest extends DefaultConnectFuture {
- private final DatagramChannel channel;
-
- private RegistrationRequest(DatagramChannel channel) {
- this.channel = channel;
- }
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Sun Sep 16 07:16:16 2007
@@ -25,7 +25,6 @@
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
-import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.DefaultIoFilterChain;
@@ -46,7 +45,7 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-class DatagramSessionImpl extends AbstractIoSession implements DatagramSession {
+class DatagramSessionImpl extends NIOSession implements DatagramSession {
static final TransportMetadata METADATA =
new DefaultTransportMetadata(
@@ -70,8 +69,6 @@
private SelectionKey key;
- private int readBufferSize;
-
/**
* Creates a new acceptor instance.
*/
@@ -180,10 +177,6 @@
return (InetSocketAddress) super.getServiceAddress();
}
- int getReadBufferSize() {
- return readBufferSize;
- }
-
private class SessionConfigImpl extends AbstractDatagramSessionConfig {
public int getReceiveBufferSize() {
@@ -200,7 +193,7 @@
ch.socket().setReceiveBufferSize(receiveBufferSize);
// Re-retrieve the effective receive buffer size.
receiveBufferSize = ch.socket().getReceiveBufferSize();
- DatagramSessionImpl.this.readBufferSize = receiveBufferSize;
+ DatagramSessionImpl.this.config.setReadBufferSize(receiveBufferSize);
} catch (SocketException e) {
throw new RuntimeIOException(e);
}
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java?rev=576109&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java Sun Sep 16 07:16:16 2007
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.mina.common.AbstractIoProcessor;
+import org.apache.mina.common.AbstractIoSession;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.FileRegion;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.RuntimeIOException;
+
+/**
+ *
+ * @author Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+class NIOProcessor extends AbstractIoProcessor {
+
+ protected final Selector selector;
+
+ NIOProcessor(String threadName, Executor executor) {
+ super(threadName, executor);
+
+ try {
+ this.selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to open a selector.", e);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ try {
+ selector.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+
+ @Override
+ protected int select(int timeout) throws Exception {
+ return selector.select(1000);
+ }
+
+ @Override
+ protected void wakeup() {
+ selector.wakeup();
+ }
+
+ @Override
+ protected Iterator<AbstractIoSession> allSessions() throws Exception {
+ return new IoSessionIterator(selector.keys());
+ }
+
+ protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ return new IoSessionIterator(selector.selectedKeys());
+ }
+
+ @Override
+ protected void doAdd(IoSession session) throws Exception {
+ SelectableChannel ch = (SelectableChannel) getChannel(session);
+ ch.configureBlocking(false);
+ setSelectionKey(
+ session,
+ ch.register(selector, SelectionKey.OP_READ, session));
+ }
+
+ @Override
+ protected void doRemove(IoSession session) throws Exception {
+ ByteChannel ch = getChannel(session);
+ SelectionKey key = getSelectionKey(session);
+ key.cancel();
+ ch.close();
+ }
+
+ @Override
+ protected SessionState state(IoSession session) {
+ SelectionKey key = getSelectionKey(session);
+ if (key == null) {
+ return SessionState.PREPARING;
+ }
+
+ return key.isValid()? SessionState.OPEN : SessionState.CLOSED;
+ }
+
+ @Override
+ protected int readyOps(IoSession session) throws Exception {
+ return getSelectionKey(session).readyOps();
+ }
+
+ @Override
+ protected int interestOps(IoSession session) throws Exception {
+ return getSelectionKey(session).interestOps();
+ }
+
+ @Override
+ protected void interestOps(IoSession session, int interestOps) throws Exception {
+ getSelectionKey(session).interestOps(interestOps);
+ }
+
+ @Override
+ protected int read(IoSession session, ByteBuffer buf) throws Exception {
+ return getChannel(session).read(buf.buf());
+ }
+
+ @Override
+ protected int write(IoSession session, ByteBuffer buf) throws Exception {
+ return getChannel(session).write(buf.buf());
+ }
+
+ @Override
+ protected long transferFile(IoSession session, FileRegion region) throws Exception {
+ return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), getChannel(session));
+ }
+
+ private ByteChannel getChannel(IoSession session) {
+ return ((NIOSession) session).getChannel();
+ }
+
+ private SelectionKey getSelectionKey(IoSession session) {
+ return ((NIOSession) session).getSelectionKey();
+ }
+
+ private void setSelectionKey(IoSession session, SelectionKey key) {
+ ((NIOSession) session).setSelectionKey(key);
+ }
+
+
+ protected static class IoSessionIterator implements Iterator<AbstractIoSession> {
+ private final Iterator<SelectionKey> i;
+ private IoSessionIterator(Set<SelectionKey> keys) {
+ i = keys.iterator();
+ }
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ public AbstractIoSession next() {
+ SelectionKey key = i.next();
+ return (AbstractIoSession) key.attachment();
+ }
+
+ public void remove() {
+ i.remove();
+ }
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java?rev=576109&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java Sun Sep 16 07:16:16 2007
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.common.AbstractIoSession;
+
+/**
+ *
+ * @author Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+abstract class NIOSession extends AbstractIoSession {
+ abstract ByteChannel getChannel();
+ abstract SelectionKey getSelectionKey();
+ abstract void setSelectionKey(SelectionKey key);
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Sep 16 07:16:16 2007
@@ -74,7 +74,7 @@
private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
- private final SocketIoProcessor[] ioProcessors;
+ private final NIOProcessor[] ioProcessors;
private final int processorCount;
@@ -145,12 +145,12 @@
// Set other properties and initialize
this.executor = executor;
this.processorCount = processorCount;
- ioProcessors = new SocketIoProcessor[processorCount];
+ ioProcessors = new NIOProcessor[processorCount];
// create an array of SocketIoProcessors that will be used for
// handling sessions.
for (int i = 0; i < processorCount; i++) {
- ioProcessors[i] = new SocketIoProcessor(
+ ioProcessors[i] = new NIOProcessor(
"SocketAcceptorIoProcessor-" + id + "." + i, executor);
}
}
@@ -433,7 +433,7 @@
}
}
- private SocketIoProcessor nextProcessor() {
+ private NIOProcessor nextProcessor() {
if (this.processorDistributor == Integer.MAX_VALUE) {
this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Sep 16 07:16:16 2007
@@ -62,7 +62,7 @@
private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
- private final SocketIoProcessor[] ioProcessors;
+ private final NIOProcessor[] ioProcessors;
private final int processorCount;
@@ -104,10 +104,10 @@
this.executor = executor;
this.processorCount = processorCount;
- ioProcessors = new SocketIoProcessor[processorCount];
+ ioProcessors = new NIOProcessor[processorCount];
for (int i = 0; i < processorCount; i++) {
- ioProcessors[i] = new SocketIoProcessor(
+ ioProcessors[i] = new NIOProcessor(
"SocketConnectorIoProcessor-" + id + "." + i, executor);
}
}
@@ -295,7 +295,7 @@
session.getProcessor().add(session);
}
- private SocketIoProcessor nextProcessor() {
+ private NIOProcessor nextProcessor() {
if (this.processorDistributor == Integer.MAX_VALUE) {
this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sun Sep 16 07:16:16 2007
@@ -24,7 +24,6 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
@@ -43,7 +42,7 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-class SocketSessionImpl extends AbstractIoSession implements SocketSession {
+class SocketSessionImpl extends NIOSession implements SocketSession {
static final TransportMetadata METADATA =
new DefaultTransportMetadata(