You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/16 14:54:45 UTC
svn commit: r576096 - in /mina/trunk/core/src/main/java/org/apache/mina:
common/AbstractIoProcessor.java common/AbstractIoSession.java
common/AbstractIoSessionConfig.java common/IoSessionConfig.java
transport/socket/nio/SocketIoProcessor.java
Author: trustin
Date: Sun Sep 16 05:54:44 2007
New Revision: 576096
URL: http://svn.apache.org/viewvc?rev=576096&view=rev
Log:
* Added AbstractIoProcessor; extracted from SocketIoProcessor
* Rewrote SocketIoProcessor to use AbstractIoProcessor
* Added IoSessionConfig properties
** readBufferSize
** minReadBufferSize
** maxReadBufferSize
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576096&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 05:54:44 2007
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
+import org.apache.mina.util.NamePreservingRunnable;
+
+/**
+ * An abstract implementation of {@link IoProcessor} which helps
+ * transport developers to write an {@link IoProcessor} easily.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ *
+ * TODO Provide abstraction for bind, unbind and connect (+ cancellation)
+ */
+public abstract class AbstractIoProcessor implements IoProcessor {
+
+ private final Object lock = new Object();
+ private final String threadName;
+ private final Executor executor;
+
+ private final Queue<AbstractIoSession> newSessions =
+ new ConcurrentLinkedQueue<AbstractIoSession>();
+ private final Queue<AbstractIoSession> removingSessions =
+ new ConcurrentLinkedQueue<AbstractIoSession>();
+ private final Queue<AbstractIoSession> flushingSessions =
+ new ConcurrentLinkedQueue<AbstractIoSession>();
+ private final Queue<AbstractIoSession> trafficControllingSessions =
+ new ConcurrentLinkedQueue<AbstractIoSession>();
+
+ private Worker worker;
+ private long lastIdleCheckTime;
+
+ protected AbstractIoProcessor(String threadName, Executor executor) {
+ this.threadName = threadName;
+ this.executor = executor;
+ }
+
+ protected abstract int select(int timeout) throws Exception;
+
+ protected abstract void wakeup();
+
+ protected abstract Iterator<AbstractIoSession> allSessions() throws Exception;
+
+ protected abstract Iterator<AbstractIoSession> selectedSessions() throws Exception;
+
+ protected abstract SessionState state(IoSession session);
+
+ protected abstract int readyOps(IoSession session) throws Exception;
+
+ protected abstract int interestOps(IoSession session) throws Exception;
+
+ protected abstract void interestOps(IoSession session, int interestOps) throws Exception;
+
+ protected abstract void doAdd(IoSession session) throws Exception;
+
+ protected abstract void doRemove(IoSession session) throws Exception;
+
+ protected abstract int read(IoSession session, ByteBuffer buf) throws Exception;
+
+ protected abstract int write(IoSession session, ByteBuffer buf) throws Exception;
+
+ protected abstract long transferFile(IoSession session, FileRegion region) throws Exception;
+
+ public void add(IoSession session) {
+ newSessions.add((AbstractIoSession) session);
+ startupWorker();
+ }
+
+ public void remove(IoSession session) {
+ scheduleRemove((AbstractIoSession) session);
+ startupWorker();
+ }
+
+ public void flush(IoSession session, WriteRequest writeRequest) {
+ boolean needsWakeup = flushingSessions.isEmpty();
+ if (scheduleFlush((AbstractIoSession) session) && needsWakeup) {
+ wakeup();
+ }
+ }
+
+ public void updateTrafficMask(IoSession session) {
+ scheduleTrafficControl((AbstractIoSession) session);
+ wakeup();
+ }
+
+ private void startupWorker() {
+ synchronized (lock) {
+ if (worker == null) {
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+ wakeup();
+ }
+
+ private void scheduleRemove(AbstractIoSession session) {
+ removingSessions.add(session);
+ }
+
+ private boolean scheduleFlush(AbstractIoSession session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ }
+ return false;
+ }
+
+ private void scheduleTrafficControl(AbstractIoSession session) {
+ trafficControllingSessions.add(session);
+ }
+
+ private int add() {
+ int addedSessions = 0;
+ for (; ;) {
+ AbstractIoSession session = newSessions.poll();
+
+ if (session == null) {
+ break;
+ }
+
+ try {
+ doAdd(session);
+ addedSessions ++;
+
+ // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
+ } catch (Exception e) {
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+
+ return addedSessions;
+ }
+
+ private int remove() {
+ int removedSessions = 0;
+ for (; ;) {
+ AbstractIoSession session = removingSessions.poll();
+
+ if (session == null) {
+ break;
+ }
+
+ SessionState state = state(session);
+ switch (state) {
+ case OPEN:
+ try {
+ doRemove(session);
+ removedSessions ++;
+ } catch (Exception e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ } finally {
+ clearWriteRequestQueue(session);
+ ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+ }
+ break;
+ case CLOSED:
+ // Skip if channel is already closed
+ break;
+ case PREPARING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ scheduleRemove(session);
+ return removedSessions;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+
+ return removedSessions;
+ }
+
+ private void process() throws Exception {
+ for (Iterator<AbstractIoSession> i = selectedSessions(); i.hasNext();) {
+ process(i.next());
+ i.remove();
+ }
+ }
+
+ private void process(AbstractIoSession session) throws Exception {
+ if (((readyOps(session) & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
+ read(session);
+ }
+
+ if (((readyOps(session) & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
+ scheduleFlush(session);
+ }
+ }
+
+ private void read(AbstractIoSession session) {
+ IoSessionConfig config = session.getConfig();
+ ByteBuffer buf = ByteBuffer.allocate(config.getReadBufferSize());
+
+ try {
+ int readBytes = 0;
+ int ret;
+
+ try {
+ while ((ret = read(session, buf)) > 0) {
+ readBytes += ret;
+ }
+ } finally {
+ buf.flip();
+ }
+
+ session.increaseReadBytes(readBytes);
+
+ if (readBytes > 0) {
+ session.getFilterChain().fireMessageReceived(session, buf);
+ buf = null;
+
+ if (readBytes * 2 < config.getReadBufferSize()) {
+ if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
+ config.setReadBufferSize(config.getReadBufferSize() >>> 1);
+ }
+ } else if (readBytes == config.getReadBufferSize()) {
+ int newReadBufferSize = config.getReadBufferSize() << 1;
+ if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
+ config.setReadBufferSize(newReadBufferSize);
+ } else {
+ config.setReadBufferSize(config.getMaxReadBufferSize());
+ }
+ }
+ }
+ if (ret < 0) {
+ scheduleRemove(session);
+ }
+ } catch (IOException e) {
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
+ } catch (Throwable e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+
+ private void notifyIdleness() throws Exception {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleCheckTime) >= 1000) {
+ lastIdleCheckTime = currentTime;
+ for (Iterator<AbstractIoSession> i = allSessions(); i.hasNext();) {
+ AbstractIoSession session = i.next();
+ try {
+ notifyIdleness(session, currentTime);
+ } catch (Exception e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+ }
+
+ private void notifyIdleness(AbstractIoSession session, long currentTime) throws Exception {
+ notifyIdleness0(session, currentTime, session
+ .getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
+ .getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(session, currentTime, session
+ .getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
+ session.getLastIdleTime(IdleStatus.READER_IDLE)));
+ notifyIdleness0(session, currentTime, session
+ .getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
+ session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getConfig().getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyIdleness0(AbstractIoSession session, long currentTime,
+ long idleTime, IdleStatus status, long lastIoTime) {
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime) {
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
+ }
+ }
+
+ private void notifyWriteTimeout(AbstractIoSession session,
+ long currentTime, long writeTimeout, long lastIoTime) throws Exception {
+ if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
+ && (interestOps(session) & SelectionKey.OP_WRITE) != 0) {
+ session.getFilterChain().fireExceptionCaught(session,
+ new WriteTimeoutException());
+ }
+ }
+
+ private void flush() {
+ if (flushingSessions.size() == 0) {
+ return;
+ }
+
+ for (; ;) {
+ AbstractIoSession session = flushingSessions.poll();
+
+ if (session == null) {
+ break;
+ }
+
+ session.setScheduledForFlush(false);
+
+ if (!session.isConnected()) {
+ clearWriteRequestQueue(session);
+ continue;
+ }
+
+ SessionState state = state(session);
+ switch (state) {
+ case OPEN:
+ try {
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
+ } catch (Exception e) {
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ break;
+ case CLOSED:
+ // Skip if the channel is already closed.
+ break;
+ case PREPARING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ scheduleFlush(session);
+ return;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+ }
+
+ private void clearWriteRequestQueue(AbstractIoSession session) {
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ if ((req = writeRequestQueue.poll()) != null) {
+ Object m = req.getMessage();
+ if (m instanceof ByteBuffer) {
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+
+ // The first unwritten empty buffer must be
+ // forwarded to the filter chain.
+ if (buf.hasRemaining()) {
+ req.getFuture().setWritten(false);
+ } else {
+ session.getFilterChain().fireMessageSent(session, req);
+ }
+ } else {
+ req.getFuture().setWritten(false);
+ }
+
+ // Discard others.
+ while ((req = writeRequestQueue.poll()) != null) {
+ req.getFuture().setWritten(false);
+ }
+ }
+ }
+
+ private boolean flush(AbstractIoSession session) throws Exception {
+ // Clear OP_WRITE
+ interestOps(session, interestOps(session) & (~SelectionKey.OP_WRITE));
+
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+ // Set limitation for the number of written bytes for read-write
+ // fairness.
+ int maxWrittenBytes = session.getConfig().getMaxReadBufferSize();
+ int writtenBytes = 0;
+
+ try {
+ do {
+ // Check for pending writes.
+ WriteRequest req = writeRequestQueue.peek();
+
+ if (req == null) {
+ break;
+ }
+
+ Object message = req.getMessage();
+ if (message instanceof FileRegion) {
+ FileRegion region = (FileRegion) message;
+
+ if (region.getCount() <= 0) {
+ // File has been sent, remove from queue
+ writeRequestQueue.poll();
+ session.increaseWrittenMessages();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+ long localWrittenBytes = transferFile(session, region);
+ region.setPosition(region.getPosition() + localWrittenBytes);
+ writtenBytes += localWrittenBytes;
+ }
+
+ if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+ return false;
+ }
+
+ } else {
+ ByteBuffer buf = (ByteBuffer) message;
+ if (buf.remaining() == 0) {
+ // Buffer has been completely sent, remove request form queue
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+ writtenBytes += write(session, buf);
+ }
+
+ if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+ return false;
+ }
+ }
+ } while (writtenBytes < maxWrittenBytes);
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+
+ return true;
+ }
+
+ private void updateTrafficMask() {
+ for (; ;) {
+ AbstractIoSession session = trafficControllingSessions.poll();
+
+ if (session == null) {
+ break;
+ }
+
+ SessionState state = state(session);
+ switch (state) {
+ case OPEN:
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+ int ops = SelectionKey.OP_READ;
+ if (!session.getWriteRequestQueue().isEmpty()) {
+ ops |= SelectionKey.OP_WRITE;
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ try {
+ interestOps(session, ops & mask);
+ } catch (Exception e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ break;
+ case CLOSED:
+ break;
+ case PREPARING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ scheduleTrafficControl(session);
+ return;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+ }
+
+ private class Worker implements Runnable {
+ public void run() {
+ int nSessions = 0;
+
+ Thread.currentThread().setName(AbstractIoProcessor.this.threadName);
+ lastIdleCheckTime = System.currentTimeMillis();
+
+ for (;;) {
+ try {
+ int nKeys = select(1000);
+
+ nSessions += add();
+ updateTrafficMask();
+
+ if (nKeys > 0) {
+ process();
+ }
+
+ flush();
+ nSessions -= remove();
+ notifyIdleness();
+
+ if (nSessions == 0) {
+ synchronized (lock) {
+ if (newSessions.isEmpty()) {
+ worker = null;
+ break;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ protected static enum SessionState {
+ OPEN,
+ CLOSED,
+ PREPARING,
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 16 05:54:44 2007
@@ -68,17 +68,17 @@
*/
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
+ private volatile boolean closing;
+
+ private TrafficMask trafficMask = TrafficMask.ALL;
+
+ // Status variables
private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
private final AtomicLong scheduledWriteBytes = new AtomicLong();
private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
- private volatile boolean closing;
-
- private TrafficMask trafficMask = TrafficMask.ALL;
-
- // Status variables
private long readBytes;
private long writtenBytes;
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java Sun Sep 16 05:54:44 2007
@@ -28,6 +28,9 @@
*/
public abstract class AbstractIoSessionConfig implements IoSessionConfig {
+ private int minReadBufferSize = 64;
+ private int readBufferSize = 2048;
+ private int maxReadBufferSize = 65536;
private int idleTimeForRead;
private int idleTimeForWrite;
private int idleTimeForBoth;
@@ -41,6 +44,9 @@
throw new NullPointerException("config");
}
+ setReadBufferSize(config.getReadBufferSize());
+ setMinReadBufferSize(config.getMinReadBufferSize());
+ setMaxReadBufferSize(config.getMaxReadBufferSize());
setIdleTime(IdleStatus.BOTH_IDLE, config.getIdleTime(IdleStatus.BOTH_IDLE));
setIdleTime(IdleStatus.READER_IDLE, config.getIdleTime(IdleStatus.READER_IDLE));
setIdleTime(IdleStatus.WRITER_IDLE, config.getIdleTime(IdleStatus.WRITER_IDLE));
@@ -54,6 +60,48 @@
* properties retrieved from the specified <tt>config</tt>.
*/
protected abstract void doSetAll(IoSessionConfig config);
+
+ public int getReadBufferSize() {
+ return readBufferSize;
+ }
+
+ public void setReadBufferSize(int readBufferSize) {
+ if (readBufferSize <= 0) {
+ throw new IllegalArgumentException("readBufferSize: " + readBufferSize + " (expected: 1+)");
+ }
+ this.readBufferSize = readBufferSize;
+ }
+
+ public int getMinReadBufferSize() {
+ return minReadBufferSize;
+ }
+
+ public void setMinReadBufferSize(int minReadBufferSize) {
+ if (minReadBufferSize <= 0) {
+ throw new IllegalArgumentException("minReadBufferSize: " + minReadBufferSize + " (expected: 1+)");
+ }
+ if (minReadBufferSize > maxReadBufferSize ) {
+ throw new IllegalArgumentException("minReadBufferSize: " + minReadBufferSize + " (expected: smaller than " + maxReadBufferSize + ')');
+
+ }
+ this.minReadBufferSize = minReadBufferSize;
+ }
+
+ public int getMaxReadBufferSize() {
+ return maxReadBufferSize;
+ }
+
+ public void setMaxReadBufferSize(int maxReadBufferSize) {
+ if (maxReadBufferSize <= 0) {
+ throw new IllegalArgumentException("maxReadBufferSize: " + maxReadBufferSize + " (expected: 1+)");
+ }
+
+ if (maxReadBufferSize < minReadBufferSize) {
+ throw new IllegalArgumentException("maxReadBufferSize: " + maxReadBufferSize + " (expected: greater than " + minReadBufferSize + ')');
+
+ }
+ this.maxReadBufferSize = maxReadBufferSize;
+ }
public int getIdleTime(IdleStatus status) {
if (status == IdleStatus.BOTH_IDLE) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java Sun Sep 16 05:54:44 2007
@@ -26,6 +26,49 @@
* @version $Rev$, $Date$
*/
public interface IoSessionConfig {
+
+ /**
+ * Returns the size of the read buffer that I/O processor allocates
+ * per each read. It's unusual to adjust this property because
+ * it's often adjusted automatically by the I/O processor.
+ */
+ int getReadBufferSize();
+
+ /**
+ * Sets the size of the read buffer that I/O processor allocates
+ * per each read. It's unusual to adjust this property because
+ * it's often adjusted automatically by the I/O processor.
+ */
+ void setReadBufferSize(int readBufferSize);
+
+ /**
+ * Returns the minimum size of the read buffer that I/O processor
+ * allocates per each read. I/O processor will not decrease the
+ * read buffer size to the smaller value than this property value.
+ */
+ int getMinReadBufferSize();
+
+ /**
+ * Sets the minimum size of the read buffer that I/O processor
+ * allocates per each read. I/O processor will not decrease the
+ * read buffer size to the smaller value than this property value.
+ */
+ void setMinReadBufferSize(int minReadBufferSize);
+
+ /**
+ * Returns the maximum size of the read buffer that I/O processor
+ * allocates per each read. I/O processor will not increase the
+ * read buffer size to the greater value than this property value.
+ */
+ int getMaxReadBufferSize();
+
+ /**
+ * Sets the maximum size of the read buffer that I/O processor
+ * allocates per each read. I/O processor will not increase the
+ * read buffer size to the greater value than this property value.
+ */
+ void setMaxReadBufferSize(int maxReadBufferSize);
+
/**
* Returns idle time for the specified type of idleness in seconds.
*/
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Sep 16 05:54:44 2007
@@ -23,23 +23,17 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Queue;
+import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import org.apache.mina.common.AbstractIoProcessor;
+import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.FileRegion;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.util.NamePreservingRunnable;
/**
* Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
@@ -47,30 +41,13 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$,
*/
-class SocketIoProcessor implements IoProcessor {
- private final Object lock = new Object();
-
- private final String threadName;
-
- private final Executor executor;
+class SocketIoProcessor extends AbstractIoProcessor {
private final Selector selector;
- private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
- private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
- private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
- private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
- private Worker worker;
-
- private long lastIdleCheckTime = System.currentTimeMillis();
-
SocketIoProcessor(String threadName, Executor executor) {
- this.threadName = threadName;
- this.executor = executor;
+ super(threadName, executor);
+
try {
this.selector = Selector.open();
} catch (IOException e) {
@@ -88,457 +65,106 @@
}
}
- public void add(IoSession session) {
- newSessions.add((SocketSessionImpl) session);
-
- startupWorker();
- }
-
- public void remove(IoSession session) {
- scheduleRemove((SocketSessionImpl) session);
- startupWorker();
+ protected int select(int timeout) throws Exception {
+ return selector.select(1000);
}
- private void startupWorker() {
- synchronized (lock) {
- if (worker == null) {
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker));
- }
- }
+ @Override
+ protected void wakeup() {
selector.wakeup();
}
- public void flush(IoSession session, WriteRequest writeRequest) {
- boolean needsWakeup = flushingSessions.isEmpty();
- if (scheduleFlush((SocketSessionImpl) session) && needsWakeup) {
- selector.wakeup();
- }
- }
-
- public void updateTrafficMask(IoSession session) {
- scheduleTrafficControl((SocketSessionImpl) session);
- selector.wakeup();
+ @Override
+ protected Iterator<AbstractIoSession> allSessions() throws Exception {
+ return new IoSessionIterator(selector.keys());
}
- private void scheduleRemove(SocketSessionImpl session) {
- removingSessions.add(session);
+ protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ return new IoSessionIterator(selector.selectedKeys());
}
- private boolean scheduleFlush(SocketSessionImpl session) {
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
-
- return true;
+ @Override
+ protected SessionState state(IoSession session) {
+ SelectionKey key = getSelectionKey(session);
+ if (key == null) {
+ return SessionState.PREPARING;
}
-
- return false;
+
+ return key.isValid()? SessionState.OPEN : SessionState.CLOSED;
}
- private void scheduleTrafficControl(SocketSessionImpl session) {
- trafficControllingSessions.add(session);
+ @Override
+ protected int readyOps(IoSession session) throws Exception {
+ return getSelectionKey(session).readyOps();
}
- private void doAddNew() {
- for (; ;) {
- SocketSessionImpl session = newSessions.poll();
-
- if (session == null) {
- break;
- }
-
- SocketChannel ch = session.getChannel();
- try {
- ch.configureBlocking(false);
- session.setSelectionKey(ch.register(selector,
- SelectionKey.OP_READ, session));
-
- // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
- // in AbstractIoFilterChain.fireSessionOpened().
- getServiceListeners(session).fireSessionCreated(session);
- } catch (IOException e) {
- // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
- // and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
+ @Override
+ protected int interestOps(IoSession session) throws Exception {
+ return getSelectionKey(session).interestOps();
}
- private IoServiceListenerSupport getServiceListeners(IoSession session) {
- IoService service = session.getService();
- if (service instanceof SocketAcceptor) {
- return ((SocketAcceptor) service).getListeners();
- } else {
- return ((SocketConnector) service).getListeners();
- }
+ @Override
+ protected void interestOps(IoSession session, int interestOps) throws Exception {
+ getSelectionKey(session).interestOps(interestOps);
}
- private void doRemove() {
- for (; ;) {
- SocketSessionImpl session = removingSessions.poll();
-
- if (session == null) {
- break;
- }
-
- SocketChannel ch = session.getChannel();
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.close() is called before addSession() is processed)
- if (key == null) {
- scheduleRemove(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid()) {
- continue;
- }
-
- try {
- key.cancel();
- ch.close();
- } catch (IOException e) {
- session.getFilterChain().fireExceptionCaught(session, e);
- } finally {
- clearWriteRequestQueue(session);
- getServiceListeners(session).fireSessionDestroyed(session);
- }
- }
+ @Override
+ protected void doAdd(IoSession session) throws Exception {
+ SocketSessionImpl s = (SocketSessionImpl) session;
+ SocketChannel ch = s.getChannel();
+ ch.configureBlocking(false);
+ s.setSelectionKey(
+ ch.register(selector, SelectionKey.OP_READ, session));
}
-
- private void process(Set<SelectionKey> selectedKeys) {
- for (SelectionKey key : selectedKeys) {
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
-
- if (key.isReadable() && session.getTrafficMask().isReadable()) {
- read(session);
- }
-
- if (key.isWritable() && session.getTrafficMask().isWritable()) {
- scheduleFlush(session);
- }
- }
-
- selectedKeys.clear();
+
+ @Override
+ protected void doRemove(IoSession session) throws Exception {
+ SocketSessionImpl s = (SocketSessionImpl) session;
+ SocketChannel ch = s.getChannel();
+ SelectionKey key = s.getSelectionKey();
+ key.cancel();
+ ch.close();
}
- private void read(SocketSessionImpl session) {
- ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
- SocketChannel ch = session.getChannel();
-
- try {
- int readBytes = 0;
- int ret;
-
- try {
- while ((ret = ch.read(buf.buf())) > 0) {
- readBytes += ret;
- }
- } finally {
- buf.flip();
- }
-
- session.increaseReadBytes(readBytes);
-
- if (readBytes > 0) {
- session.getFilterChain().fireMessageReceived(session, buf);
- buf = null;
-
- if (readBytes * 2 < session.getReadBufferSize()) {
- if (session.getReadBufferSize() > 64) {
- session.setReadBufferSize(session.getReadBufferSize() >>> 1);
- }
- } else if (readBytes == session.getReadBufferSize()) {
- int newReadBufferSize = session.getReadBufferSize() << 1;
- if (newReadBufferSize <= (session.getConfig().getReceiveBufferSize() << 1)) {
- // read buffer size shouldn't get bigger than
- // twice of the receive buffer size because of
- // read-write fairness.
- session.setReadBufferSize(newReadBufferSize);
- }
- }
- }
- if (ret < 0) {
- scheduleRemove(session);
- }
- } catch (IOException e) {
- scheduleRemove(session);
- session.getFilterChain().fireExceptionCaught(session, e);
- } catch (Throwable e) {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
+ @Override
+ protected int read(IoSession session, ByteBuffer buf) throws Exception {
+ return getChannel(session).read(buf.buf());
}
-
- private void notifyIdleness() {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if ((currentTime - lastIdleCheckTime) >= 1000) {
- lastIdleCheckTime = currentTime;
- Set<SelectionKey> keys = selector.keys();
- if (keys != null) {
- for (SelectionKey key : keys) {
- SocketSessionImpl session = (SocketSessionImpl) key
- .attachment();
- notifyIdleness(session, currentTime);
- }
- }
- }
+
+ @Override
+ protected int write(IoSession session, ByteBuffer buf) throws Exception {
+ return getChannel(session).write(buf.buf());
}
-
- private void notifyIdleness(SocketSessionImpl session, long currentTime) {
- notifyIdleness0(session, currentTime, session
- .getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
- IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
- .getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleness0(session, currentTime, session
- .getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
- IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
- session.getLastIdleTime(IdleStatus.READER_IDLE)));
- notifyIdleness0(session, currentTime, session
- .getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
- IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
- session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
- notifyWriteTimeout(session, currentTime, session
- .getConfig().getWriteTimeoutInMillis(), session.getLastWriteTime());
- }
-
- private void notifyIdleness0(SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status, long lastIoTime) {
- if (idleTime > 0 && lastIoTime != 0
- && (currentTime - lastIoTime) >= idleTime) {
- session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
- }
+
+ @Override
+ protected long transferFile(IoSession session, FileRegion region) throws Exception {
+ return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), getChannel(session));
}
- private void notifyWriteTimeout(SocketSessionImpl session,
- long currentTime, long writeTimeout, long lastIoTime) {
- SelectionKey key = session.getSelectionKey();
- if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
- && key != null && key.isValid()
- && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
- session.getFilterChain().fireExceptionCaught(session,
- new WriteTimeoutException());
- }
+ private SocketChannel getChannel(IoSession session) {
+ return ((SocketSessionImpl) session).getChannel();
}
-
- private void doFlush() {
- if (flushingSessions.size() == 0) {
- return;
- }
-
- for (; ;) {
- SocketSessionImpl session = flushingSessions.poll();
-
- if (session == null) {
- break;
- }
-
- session.setScheduledForFlush(false);
-
- if (!session.isConnected()) {
- clearWriteRequestQueue(session);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession() is processed)
- if (key == null) {
- scheduleFlush(session);
- break;
- }
-
- // Skip if the channel is already closed.
- if (!key.isValid()) {
- continue;
- }
-
- try {
- boolean flushedAll = doFlush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (IOException e) {
- scheduleRemove(session);
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
+
+ private SelectionKey getSelectionKey(IoSession session) {
+ return ((SocketSessionImpl) session).getSelectionKey();
}
-
- private void clearWriteRequestQueue(SocketSessionImpl session) {
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- if ((req = writeRequestQueue.poll()) != null) {
- Object m = req.getMessage();
- if (m instanceof ByteBuffer) {
- ByteBuffer buf = (ByteBuffer) req.getMessage();
-
- // The first unwritten empty buffer must be
- // forwarded to the filter chain.
- if (buf.hasRemaining()) {
- req.getFuture().setWritten(false);
- } else {
- session.getFilterChain().fireMessageSent(session, req);
- }
- } else {
- req.getFuture().setWritten(false);
- }
-
- // Discard others.
- while ((req = writeRequestQueue.poll()) != null) {
- req.getFuture().setWritten(false);
- }
+
+ private static class IoSessionIterator implements Iterator<AbstractIoSession> {
+ private final Iterator<SelectionKey> i;
+ private IoSessionIterator(Set<SelectionKey> keys) {
+ i = keys.iterator();
}
- }
-
- private boolean doFlush(SocketSessionImpl session) throws IOException {
- // Clear OP_WRITE
- SelectionKey key = session.getSelectionKey();
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- SocketChannel ch = session.getChannel();
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
- int writtenBytes = 0;
- int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
- try {
- do {
- // Check for pending writes.
- WriteRequest req = writeRequestQueue.peek();
-
- if (req == null) {
- break;
- }
-
- Object message = req.getMessage();
- if (message instanceof FileRegion) {
- FileRegion region = (FileRegion) message;
-
- if (region.getCount() <= 0) {
- // File has been sent, remove from queue
- writeRequestQueue.poll();
- session.increaseWrittenMessages();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- if (key.isWritable()) {
- long localWrittenBytes =
- region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
- region.setPosition(region.getPosition() + localWrittenBytes);
- writtenBytes += localWrittenBytes;
- }
-
- if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much.
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- }
-
- } else {
- ByteBuffer buf = (ByteBuffer) message;
- if (buf.remaining() == 0) {
- // Buffer has been completely sent, remove request form queue
- writeRequestQueue.poll();
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- if (key.isWritable()) {
- writtenBytes += ch.write(buf.buf());
- }
-
- if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much.
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- }
- }
- } while (writtenBytes < maxWrittenBytes);
- } finally {
- session.increaseWrittenBytes(writtenBytes);
+ public boolean hasNext() {
+ return i.hasNext();
}
- return true;
- }
-
- private void doUpdateTrafficMask() {
- for (; ;) {
- SocketSessionImpl session = trafficControllingSessions.poll();
-
- if (session == null) {
- break;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if (key == null) {
- scheduleTrafficControl(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid()) {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
- int ops = SelectionKey.OP_READ;
- if (!session.getWriteRequestQueue().isEmpty()) {
- ops |= SelectionKey.OP_WRITE;
- }
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- key.interestOps(ops & mask);
+ public AbstractIoSession next() {
+ SelectionKey key = i.next();
+ return (AbstractIoSession) key.attachment();
}
- }
- private class Worker implements Runnable {
- public void run() {
- Thread.currentThread().setName(SocketIoProcessor.this.threadName);
-
- for (; ;) {
- try {
- int nKeys = selector.select(1000);
- doAddNew();
- doUpdateTrafficMask();
-
- if (nKeys > 0) {
- process(selector.selectedKeys());
- }
-
- doFlush();
- doRemove();
- notifyIdleness();
-
- if (selector.keys().isEmpty()) {
- synchronized (lock) {
- if (selector.keys().isEmpty() && newSessions.isEmpty()) {
- worker = null;
- break;
- }
- }
- }
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
+ public void remove() {
+ i.remove();
}
}
}