You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/16 14:54:45 UTC

svn commit: r576096 - in /mina/trunk/core/src/main/java/org/apache/mina: common/AbstractIoProcessor.java common/AbstractIoSession.java common/AbstractIoSessionConfig.java common/IoSessionConfig.java transport/socket/nio/SocketIoProcessor.java

Author: trustin
Date: Sun Sep 16 05:54:44 2007
New Revision: 576096

URL: http://svn.apache.org/viewvc?rev=576096&view=rev
Log:
* Added AbstractIoProcessor; extracted from SocketIoProcessor
* Rewrote SocketIoProcessor to use AbstractIoProcessor
* Added IoSessionConfig properties
** readBufferSize
** minReadBufferSize
** maxReadBufferSize


Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java

Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576096&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 05:54:44 2007
@@ -0,0 +1,552 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
+import org.apache.mina.util.NamePreservingRunnable;
+
+/**
+ * An abstract implementation of {@link IoProcessor} which helps
+ * transport developers to write an {@link IoProcessor} easily.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ * 
+ * TODO Provide abstraction for bind, unbind and connect (+ cancellation)
+ */
+public abstract class AbstractIoProcessor implements IoProcessor {
+
+    private final Object lock = new Object();
+    private final String threadName;
+    private final Executor executor;
+
+    private final Queue<AbstractIoSession> newSessions =
+        new ConcurrentLinkedQueue<AbstractIoSession>();
+    private final Queue<AbstractIoSession> removingSessions = 
+        new ConcurrentLinkedQueue<AbstractIoSession>();
+    private final Queue<AbstractIoSession> flushingSessions = 
+        new ConcurrentLinkedQueue<AbstractIoSession>();
+    private final Queue<AbstractIoSession> trafficControllingSessions = 
+        new ConcurrentLinkedQueue<AbstractIoSession>();
+
+    private Worker worker;
+    private long lastIdleCheckTime;
+
+    protected AbstractIoProcessor(String threadName, Executor executor) {
+        this.threadName = threadName;
+        this.executor = executor;
+    }
+    
+    protected abstract int select(int timeout) throws Exception;
+
+    protected abstract void wakeup();
+    
+    protected abstract Iterator<AbstractIoSession> allSessions() throws Exception;
+
+    protected abstract Iterator<AbstractIoSession> selectedSessions() throws Exception;
+
+    protected abstract SessionState state(IoSession session);
+
+    protected abstract int readyOps(IoSession session) throws Exception;
+
+    protected abstract int interestOps(IoSession session) throws Exception;
+
+    protected abstract void interestOps(IoSession session, int interestOps) throws Exception;
+
+    protected abstract void doAdd(IoSession session) throws Exception;
+
+    protected abstract void doRemove(IoSession session) throws Exception;
+
+    protected abstract int read(IoSession session, ByteBuffer buf) throws Exception;
+
+    protected abstract int write(IoSession session, ByteBuffer buf) throws Exception;
+
+    protected abstract long transferFile(IoSession session, FileRegion region) throws Exception;
+
+    public void add(IoSession session) {
+        newSessions.add((AbstractIoSession) session);
+        startupWorker();
+    }
+
+    public void remove(IoSession session) {
+        scheduleRemove((AbstractIoSession) session);
+        startupWorker();
+    }
+
+    public void flush(IoSession session, WriteRequest writeRequest) {
+        boolean needsWakeup = flushingSessions.isEmpty();
+        if (scheduleFlush((AbstractIoSession) session) && needsWakeup) {
+            wakeup();
+        }
+    }
+
+    public void updateTrafficMask(IoSession session) {
+        scheduleTrafficControl((AbstractIoSession) session);
+        wakeup();
+    }
+
+    private void startupWorker() {
+        synchronized (lock) {
+            if (worker == null) {
+                worker = new Worker();
+                executor.execute(new NamePreservingRunnable(worker));
+            }
+        }
+        wakeup();
+    }
+
+    private void scheduleRemove(AbstractIoSession session) {
+        removingSessions.add(session);
+    }
+
+    private boolean scheduleFlush(AbstractIoSession session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        }
+        return false;
+    }
+
+    private void scheduleTrafficControl(AbstractIoSession session) {
+        trafficControllingSessions.add(session);
+    }
+
+    private int add() {
+        int addedSessions = 0;
+        for (; ;) {
+            AbstractIoSession session = newSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+
+            try {
+                doAdd(session);
+                addedSessions ++;
+                
+                // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+                // in AbstractIoFilterChain.fireSessionOpened().
+                ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
+            } catch (Exception e) {
+                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+                // and call ConnectFuture.setException().
+                session.getFilterChain().fireExceptionCaught(session, e);
+            }
+        }
+        
+        return addedSessions;
+    }
+    
+    private int remove() {
+        int removedSessions = 0;
+        for (; ;) {
+            AbstractIoSession session = removingSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+            
+            SessionState state = state(session);
+            switch (state) {
+            case OPEN:
+                try {
+                    doRemove(session);
+                    removedSessions ++;
+                } catch (Exception e) {
+                    session.getFilterChain().fireExceptionCaught(session, e);
+                } finally {
+                    clearWriteRequestQueue(session);
+                    ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+                }
+                break;
+            case CLOSED:
+                // Skip if channel is already closed
+                break;
+            case PREPARING:
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.close() is called before addSession() is processed)
+                scheduleRemove(session);
+                return removedSessions;
+            default:
+                throw new IllegalStateException(String.valueOf(state));
+            }
+        }
+        
+        return removedSessions;
+    }
+    
+    private void process() throws Exception {
+        for (Iterator<AbstractIoSession> i = selectedSessions(); i.hasNext();) {
+            process(i.next());
+            i.remove();
+        }
+    }
+    
+    private void process(AbstractIoSession session) throws Exception {
+        if (((readyOps(session) & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
+            read(session);
+        }
+
+        if (((readyOps(session) & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
+            scheduleFlush(session);
+        }
+    }
+    
+    private void read(AbstractIoSession session) {
+        IoSessionConfig config = session.getConfig();
+        ByteBuffer buf = ByteBuffer.allocate(config.getReadBufferSize());
+
+        try {
+            int readBytes = 0;
+            int ret;
+
+            try {
+                while ((ret = read(session, buf)) > 0) {
+                    readBytes += ret;
+                }
+            } finally {
+                buf.flip();
+            }
+
+            session.increaseReadBytes(readBytes);
+
+            if (readBytes > 0) {
+                session.getFilterChain().fireMessageReceived(session, buf);
+                buf = null;
+                
+                if (readBytes * 2 < config.getReadBufferSize()) {
+                    if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
+                        config.setReadBufferSize(config.getReadBufferSize() >>> 1);
+                    }
+                } else if (readBytes == config.getReadBufferSize()) {
+                    int newReadBufferSize = config.getReadBufferSize() << 1;
+                    if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
+                        config.setReadBufferSize(newReadBufferSize);
+                    } else {
+                        config.setReadBufferSize(config.getMaxReadBufferSize());
+                    }
+                }
+            }
+            if (ret < 0) {
+                scheduleRemove(session);
+            }
+        } catch (IOException e) {
+            scheduleRemove(session);
+            session.getFilterChain().fireExceptionCaught(session, e);
+        } catch (Throwable e) {
+            session.getFilterChain().fireExceptionCaught(session, e);
+        }
+    }
+    
+    private void notifyIdleness() throws Exception {
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - lastIdleCheckTime) >= 1000) {
+            lastIdleCheckTime = currentTime;
+            for (Iterator<AbstractIoSession> i = allSessions(); i.hasNext();) {
+                AbstractIoSession session = i.next();
+                try {
+                    notifyIdleness(session, currentTime);
+                } catch (Exception e) {
+                    session.getFilterChain().fireExceptionCaught(session, e);
+                }
+            }
+        }
+    }
+
+    private void notifyIdleness(AbstractIoSession session, long currentTime) throws Exception {
+        notifyIdleness0(session, currentTime, session
+                .getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
+                .getLastIdleTime(IdleStatus.BOTH_IDLE)));
+        notifyIdleness0(session, currentTime, session
+                .getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
+                IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
+                session.getLastIdleTime(IdleStatus.READER_IDLE)));
+        notifyIdleness0(session, currentTime, session
+                .getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+                IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
+                session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+        notifyWriteTimeout(session, currentTime, session
+                .getConfig().getWriteTimeoutInMillis(), session.getLastWriteTime());
+    }
+
+    private void notifyIdleness0(AbstractIoSession session, long currentTime,
+                                 long idleTime, IdleStatus status, long lastIoTime) {
+        if (idleTime > 0 && lastIoTime != 0
+                && (currentTime - lastIoTime) >= idleTime) {
+            session.increaseIdleCount(status);
+            session.getFilterChain().fireSessionIdle(session, status);
+        }
+    }
+
+    private void notifyWriteTimeout(AbstractIoSession session,
+                                    long currentTime, long writeTimeout, long lastIoTime) throws Exception {
+        if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
+                && (interestOps(session) & SelectionKey.OP_WRITE) != 0) {
+            session.getFilterChain().fireExceptionCaught(session,
+                    new WriteTimeoutException());
+        }
+    }
+
+    private void flush() {
+        if (flushingSessions.size() == 0) {
+            return;
+        }
+
+        for (; ;) {
+            AbstractIoSession session = flushingSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+
+            session.setScheduledForFlush(false);
+
+            if (!session.isConnected()) {
+                clearWriteRequestQueue(session);
+                continue;
+            }
+            
+            SessionState state = state(session);
+            switch (state) {
+            case OPEN:
+                try {
+                    boolean flushedAll = flush(session);
+                    if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                        scheduleFlush(session);
+                    }
+                } catch (Exception e) {
+                    scheduleRemove(session);
+                    session.getFilterChain().fireExceptionCaught(session, e);
+                }
+                break;
+            case CLOSED:
+                // Skip if the channel is already closed.
+                break;
+            case PREPARING:
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.write() is called before addSession() is processed)
+                scheduleFlush(session);
+                return;
+            default:
+                throw new IllegalStateException(String.valueOf(state));
+            }
+        }
+    }
+    
+    private void clearWriteRequestQueue(AbstractIoSession session) {
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequest req;
+
+        if ((req = writeRequestQueue.poll()) != null) {
+            Object m = req.getMessage();
+            if (m instanceof ByteBuffer) {
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+
+                // The first unwritten empty buffer must be
+                // forwarded to the filter chain.
+                if (buf.hasRemaining()) {
+                    req.getFuture().setWritten(false);
+                } else {
+                    session.getFilterChain().fireMessageSent(session, req);
+                }
+            } else {
+                req.getFuture().setWritten(false);
+            }
+
+            // Discard others.
+            while ((req = writeRequestQueue.poll()) != null) {
+                req.getFuture().setWritten(false);
+            }
+        }
+    }
+    
+    private boolean flush(AbstractIoSession session) throws Exception {
+        // Clear OP_WRITE
+        interestOps(session, interestOps(session) & (~SelectionKey.OP_WRITE));
+
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+        // Set limitation for the number of written bytes for read-write
+        // fairness.
+        int maxWrittenBytes = session.getConfig().getMaxReadBufferSize();
+        int writtenBytes = 0;
+        
+        try {
+            do {
+                // Check for pending writes.
+                WriteRequest req = writeRequestQueue.peek();
+
+                if (req == null) {
+                    break;
+                }
+
+                Object message = req.getMessage();
+                if (message instanceof FileRegion) {
+                    FileRegion region = (FileRegion) message;
+
+                    if (region.getCount() <= 0) {
+                        // File has been sent, remove from queue
+                        writeRequestQueue.poll();
+                        session.increaseWrittenMessages();
+                        session.getFilterChain().fireMessageSent(session, req);
+                        continue;
+                    }
+
+                    if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+                        long localWrittenBytes = transferFile(session, region);
+                        region.setPosition(region.getPosition() + localWrittenBytes);
+                        writtenBytes += localWrittenBytes;
+                    }
+
+                    if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+                        // Kernel buffer is full or wrote too much.
+                        interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+                        return false;
+                    }
+
+                } else {
+                    ByteBuffer buf = (ByteBuffer) message;
+                    if (buf.remaining() == 0) {
+                        // Buffer has been completely sent, remove request form queue
+                        writeRequestQueue.poll();
+
+                        session.increaseWrittenMessages();
+
+                        buf.reset();
+                        session.getFilterChain().fireMessageSent(session, req);
+                        continue;
+                    }
+
+                    if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+                        writtenBytes += write(session, buf);
+                    }
+
+                    if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+                        // Kernel buffer is full or wrote too much.
+                        interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+                        return false;
+                    }
+                }
+            } while (writtenBytes < maxWrittenBytes);
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
+        }
+
+        return true;
+    }
+    
+    private void updateTrafficMask() {
+        for (; ;) {
+            AbstractIoSession session = trafficControllingSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+            
+            SessionState state = state(session);
+            switch (state) {
+            case OPEN:
+                // The normal is OP_READ and, if there are write requests in the
+                // session's write queue, set OP_WRITE to trigger flushing.
+                int ops = SelectionKey.OP_READ;
+                if (!session.getWriteRequestQueue().isEmpty()) {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+
+                // Now mask the preferred ops with the mask of the current session
+                int mask = session.getTrafficMask().getInterestOps();
+                try {
+                    interestOps(session, ops & mask);
+                } catch (Exception e) {
+                    session.getFilterChain().fireExceptionCaught(session, e);
+                }
+                break;
+            case CLOSED:
+                break;
+            case PREPARING:
+                // Retry later if session is not yet fully initialized.
+                // (In case that Session.suspend??() or session.resume??() is
+                // called before addSession() is processed)
+                scheduleTrafficControl(session);
+                return;
+            default:
+                throw new IllegalStateException(String.valueOf(state));
+            }
+        }
+    }
+    
+    private class Worker implements Runnable {
+        public void run() {
+            int nSessions = 0;
+            
+            Thread.currentThread().setName(AbstractIoProcessor.this.threadName);
+            lastIdleCheckTime = System.currentTimeMillis();
+
+            for (;;) {
+                try {
+                    int nKeys = select(1000);
+                    
+                    nSessions += add();
+                    updateTrafficMask();
+
+                    if (nKeys > 0) {
+                        process();
+                    }
+
+                    flush();
+                    nSessions -= remove();
+                    notifyIdleness();
+
+                    if (nSessions == 0) {
+                        synchronized (lock) {
+                            if (newSessions.isEmpty()) {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                } catch (Throwable t) {
+                    ExceptionMonitor.getInstance().exceptionCaught(t);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
+                    }
+                }
+            }
+        }
+    }
+    
+    protected static enum SessionState {
+        OPEN,
+        CLOSED,
+        PREPARING,
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 16 05:54:44 2007
@@ -68,17 +68,17 @@
      */
     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
 
+    private volatile boolean closing;
+
+    private TrafficMask trafficMask = TrafficMask.ALL;
+
+    // Status variables
     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
     
     private final AtomicLong scheduledWriteBytes = new AtomicLong();
 
     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
 
-    private volatile boolean closing;
-
-    private TrafficMask trafficMask = TrafficMask.ALL;
-
-    // Status variables
     private long readBytes;
 
     private long writtenBytes;

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java Sun Sep 16 05:54:44 2007
@@ -28,6 +28,9 @@
  */
 public abstract class AbstractIoSessionConfig implements IoSessionConfig {
 
+    private int minReadBufferSize = 64;
+    private int readBufferSize = 2048;
+    private int maxReadBufferSize = 65536;
     private int idleTimeForRead;
     private int idleTimeForWrite;
     private int idleTimeForBoth;
@@ -41,6 +44,9 @@
             throw new NullPointerException("config");
         }
         
+        setReadBufferSize(config.getReadBufferSize());
+        setMinReadBufferSize(config.getMinReadBufferSize());
+        setMaxReadBufferSize(config.getMaxReadBufferSize());
         setIdleTime(IdleStatus.BOTH_IDLE, config.getIdleTime(IdleStatus.BOTH_IDLE));
         setIdleTime(IdleStatus.READER_IDLE, config.getIdleTime(IdleStatus.READER_IDLE));
         setIdleTime(IdleStatus.WRITER_IDLE, config.getIdleTime(IdleStatus.WRITER_IDLE));
@@ -54,6 +60,48 @@
      * properties retrieved from the specified <tt>config</tt>.
      */
     protected abstract void doSetAll(IoSessionConfig config);
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+    
+    public void setReadBufferSize(int readBufferSize) {
+        if (readBufferSize <= 0) {
+            throw new IllegalArgumentException("readBufferSize: " + readBufferSize + " (expected: 1+)");
+        }
+        this.readBufferSize = readBufferSize;
+    }
+
+    public int getMinReadBufferSize() {
+        return minReadBufferSize;
+    }
+    
+    public void setMinReadBufferSize(int minReadBufferSize) {
+        if (minReadBufferSize <= 0) {
+            throw new IllegalArgumentException("minReadBufferSize: " + minReadBufferSize + " (expected: 1+)");
+        }
+        if (minReadBufferSize > maxReadBufferSize ) {
+            throw new IllegalArgumentException("minReadBufferSize: " + minReadBufferSize + " (expected: smaller than " + maxReadBufferSize + ')');
+            
+        }
+        this.minReadBufferSize = minReadBufferSize;
+    }
+
+    public int getMaxReadBufferSize() {
+        return maxReadBufferSize;
+    }
+    
+    public void setMaxReadBufferSize(int maxReadBufferSize) {
+        if (maxReadBufferSize <= 0) {
+            throw new IllegalArgumentException("maxReadBufferSize: " + maxReadBufferSize + " (expected: 1+)");
+        }
+        
+        if (maxReadBufferSize < minReadBufferSize) {
+            throw new IllegalArgumentException("maxReadBufferSize: " + maxReadBufferSize + " (expected: greater than " + minReadBufferSize + ')');
+            
+        }
+        this.maxReadBufferSize = maxReadBufferSize;
+    }
 
     public int getIdleTime(IdleStatus status) {
         if (status == IdleStatus.BOTH_IDLE) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java Sun Sep 16 05:54:44 2007
@@ -26,6 +26,49 @@
  * @version $Rev$, $Date$
  */
 public interface IoSessionConfig {
+    
+    /**
+     * Returns the size of the read buffer that I/O processor allocates
+     * per each read.  It's unusual to adjust this property because
+     * it's often adjusted automatically by the I/O processor.
+     */
+    int getReadBufferSize();
+
+    /**
+     * Sets the size of the read buffer that I/O processor allocates
+     * per each read.  It's unusual to adjust this property because
+     * it's often adjusted automatically by the I/O processor.
+     */
+    void setReadBufferSize(int readBufferSize);
+    
+    /**
+     * Returns the minimum size of the read buffer that I/O processor
+     * allocates per each read.  I/O processor will not decrease the
+     * read buffer size to the smaller value than this property value.
+     */
+    int getMinReadBufferSize();
+
+    /**
+     * Sets the minimum size of the read buffer that I/O processor
+     * allocates per each read.  I/O processor will not decrease the
+     * read buffer size to the smaller value than this property value.
+     */
+    void setMinReadBufferSize(int minReadBufferSize);
+    
+    /**
+     * Returns the maximum size of the read buffer that I/O processor
+     * allocates per each read.  I/O processor will not increase the
+     * read buffer size to the greater value than this property value.
+     */
+    int getMaxReadBufferSize();
+
+    /**
+     * Sets the maximum size of the read buffer that I/O processor
+     * allocates per each read.  I/O processor will not increase the
+     * read buffer size to the greater value than this property value.
+     */
+    void setMaxReadBufferSize(int maxReadBufferSize);
+    
     /**
      * Returns idle time for the specified type of idleness in seconds.
      */

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=576096&r1=576095&r2=576096&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Sep 16 05:54:44 2007
@@ -23,23 +23,17 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Queue;
+import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
+import org.apache.mina.common.AbstractIoProcessor;
+import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.FileRegion;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.util.NamePreservingRunnable;
 
 /**
  * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
@@ -47,30 +41,13 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$,
  */
-class SocketIoProcessor implements IoProcessor {
-    private final Object lock = new Object();
-
-    private final String threadName;
-
-    private final Executor executor;
+class SocketIoProcessor extends AbstractIoProcessor {
 
     private final Selector selector;
 
-    private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
-    private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
-    private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
-    private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
-
-    private Worker worker;
-
-    private long lastIdleCheckTime = System.currentTimeMillis();
-
     SocketIoProcessor(String threadName, Executor executor) {
-        this.threadName = threadName;
-        this.executor = executor;
+        super(threadName, executor);
+
         try {
             this.selector = Selector.open();
         } catch (IOException e) {
@@ -88,457 +65,106 @@
         }
     }
 
-    public void add(IoSession session) {
-        newSessions.add((SocketSessionImpl) session);
-
-        startupWorker();
-    }
-
-    public void remove(IoSession session) {
-        scheduleRemove((SocketSessionImpl) session);
-        startupWorker();
+    protected int select(int timeout) throws Exception {
+        return selector.select(1000);
     }
 
-    private void startupWorker() {
-        synchronized (lock) {
-            if (worker == null) {
-                worker = new Worker();
-                executor.execute(new NamePreservingRunnable(worker));
-            }
-        }
+    @Override
+    protected void wakeup() {
         selector.wakeup();
     }
 
-    public void flush(IoSession session, WriteRequest writeRequest) {
-        boolean needsWakeup = flushingSessions.isEmpty();
-        if (scheduleFlush((SocketSessionImpl) session) && needsWakeup) {
-            selector.wakeup();
-        }
-    }
-
-    public void updateTrafficMask(IoSession session) {
-        scheduleTrafficControl((SocketSessionImpl) session);
-        selector.wakeup();
+    @Override
+    protected Iterator<AbstractIoSession> allSessions() throws Exception {
+        return new IoSessionIterator(selector.keys());
     }
 
-    private void scheduleRemove(SocketSessionImpl session) {
-        removingSessions.add(session);
+    protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+        return new IoSessionIterator(selector.selectedKeys());
     }
 
-    private boolean scheduleFlush(SocketSessionImpl session) {
-        if (session.setScheduledForFlush(true)) {
-            flushingSessions.add(session);
-
-            return true;
+    @Override
+    protected SessionState state(IoSession session) {
+        SelectionKey key = getSelectionKey(session);
+        if (key == null) {
+            return SessionState.PREPARING;
         }
-
-        return false;
+        
+        return key.isValid()? SessionState.OPEN : SessionState.CLOSED;
     }
 
-    private void scheduleTrafficControl(SocketSessionImpl session) {
-        trafficControllingSessions.add(session);
+    @Override
+    protected int readyOps(IoSession session) throws Exception {
+        return getSelectionKey(session).readyOps();
     }
 
-    private void doAddNew() {
-        for (; ;) {
-            SocketSessionImpl session = newSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            SocketChannel ch = session.getChannel();
-            try {
-                ch.configureBlocking(false);
-                session.setSelectionKey(ch.register(selector,
-                        SelectionKey.OP_READ, session));
-
-                // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
-                // in AbstractIoFilterChain.fireSessionOpened().
-                getServiceListeners(session).fireSessionCreated(session);
-            } catch (IOException e) {
-                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
-                // and call ConnectFuture.setException().
-                session.getFilterChain().fireExceptionCaught(session, e);
-            }
-        }
+    @Override
+    protected int interestOps(IoSession session) throws Exception {
+        return getSelectionKey(session).interestOps();
     }
 
-    private IoServiceListenerSupport getServiceListeners(IoSession session) {
-        IoService service = session.getService();
-        if (service instanceof SocketAcceptor) {
-            return ((SocketAcceptor) service).getListeners();
-        } else {
-            return ((SocketConnector) service).getListeners();
-        }
+    @Override
+    protected void interestOps(IoSession session, int interestOps) throws Exception {
+        getSelectionKey(session).interestOps(interestOps);
     }
 
-    private void doRemove() {
-        for (; ;) {
-            SocketSessionImpl session = removingSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            SocketChannel ch = session.getChannel();
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.close() is called before addSession() is processed)
-            if (key == null) {
-                scheduleRemove(session);
-                break;
-            }
-            // skip if channel is already closed
-            if (!key.isValid()) {
-                continue;
-            }
-
-            try {
-                key.cancel();
-                ch.close();
-            } catch (IOException e) {
-                session.getFilterChain().fireExceptionCaught(session, e);
-            } finally {
-                clearWriteRequestQueue(session);
-                getServiceListeners(session).fireSessionDestroyed(session);
-            }
-        }
+    @Override
+    protected void doAdd(IoSession session) throws Exception {
+        SocketSessionImpl s = (SocketSessionImpl) session;
+        SocketChannel ch = s.getChannel();
+        ch.configureBlocking(false);
+        s.setSelectionKey(
+                ch.register(selector, SelectionKey.OP_READ, session));
     }
-
-    private void process(Set<SelectionKey> selectedKeys) {
-        for (SelectionKey key : selectedKeys) {
-            SocketSessionImpl session = (SocketSessionImpl) key.attachment();
-
-            if (key.isReadable() && session.getTrafficMask().isReadable()) {
-                read(session);
-            }
-
-            if (key.isWritable() && session.getTrafficMask().isWritable()) {
-                scheduleFlush(session);
-            }
-        }
-
-        selectedKeys.clear();
+    
+    @Override
+    protected void doRemove(IoSession session) throws Exception {
+        SocketSessionImpl s = (SocketSessionImpl) session;
+        SocketChannel ch = s.getChannel();
+        SelectionKey key = s.getSelectionKey();
+        key.cancel();
+        ch.close();
     }
 
-    private void read(SocketSessionImpl session) {
-        ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
-        SocketChannel ch = session.getChannel();
-
-        try {
-            int readBytes = 0;
-            int ret;
-
-            try {
-                while ((ret = ch.read(buf.buf())) > 0) {
-                    readBytes += ret;
-                }
-            } finally {
-                buf.flip();
-            }
-
-            session.increaseReadBytes(readBytes);
-
-            if (readBytes > 0) {
-                session.getFilterChain().fireMessageReceived(session, buf);
-                buf = null;
-
-                if (readBytes * 2 < session.getReadBufferSize()) {
-                    if (session.getReadBufferSize() > 64) {
-                        session.setReadBufferSize(session.getReadBufferSize() >>> 1);
-                    }
-                } else if (readBytes == session.getReadBufferSize()) {
-                    int newReadBufferSize = session.getReadBufferSize() << 1;
-                    if (newReadBufferSize <= (session.getConfig().getReceiveBufferSize() << 1)) {
-                        // read buffer size shouldn't get bigger than
-                        // twice of the receive buffer size because of
-                        // read-write fairness.
-                        session.setReadBufferSize(newReadBufferSize);
-                    }
-                }
-            }
-            if (ret < 0) {
-                scheduleRemove(session);
-            }
-        } catch (IOException e) {
-            scheduleRemove(session);
-            session.getFilterChain().fireExceptionCaught(session, e);
-        } catch (Throwable e) {
-            session.getFilterChain().fireExceptionCaught(session, e);
-        }
+    @Override
+    protected int read(IoSession session, ByteBuffer buf) throws Exception {
+        return getChannel(session).read(buf.buf());
     }
-
-    private void notifyIdleness() {
-        // process idle sessions
-        long currentTime = System.currentTimeMillis();
-        if ((currentTime - lastIdleCheckTime) >= 1000) {
-            lastIdleCheckTime = currentTime;
-            Set<SelectionKey> keys = selector.keys();
-            if (keys != null) {
-                for (SelectionKey key : keys) {
-                    SocketSessionImpl session = (SocketSessionImpl) key
-                            .attachment();
-                    notifyIdleness(session, currentTime);
-                }
-            }
-        }
+    
+    @Override
+    protected int write(IoSession session, ByteBuffer buf) throws Exception {
+        return getChannel(session).write(buf.buf());
     }
-
-    private void notifyIdleness(SocketSessionImpl session, long currentTime) {
-        notifyIdleness0(session, currentTime, session
-                .getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
-                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
-                .getLastIdleTime(IdleStatus.BOTH_IDLE)));
-        notifyIdleness0(session, currentTime, session
-                .getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
-                IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
-                session.getLastIdleTime(IdleStatus.READER_IDLE)));
-        notifyIdleness0(session, currentTime, session
-                .getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
-                IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
-                session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
-        notifyWriteTimeout(session, currentTime, session
-                .getConfig().getWriteTimeoutInMillis(), session.getLastWriteTime());
-    }
-
-    private void notifyIdleness0(SocketSessionImpl session, long currentTime,
-                                 long idleTime, IdleStatus status, long lastIoTime) {
-        if (idleTime > 0 && lastIoTime != 0
-                && (currentTime - lastIoTime) >= idleTime) {
-            session.increaseIdleCount(status);
-            session.getFilterChain().fireSessionIdle(session, status);
-        }
+    
+    @Override
+    protected long transferFile(IoSession session, FileRegion region) throws Exception {
+        return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), getChannel(session));
     }
 
-    private void notifyWriteTimeout(SocketSessionImpl session,
-                                    long currentTime, long writeTimeout, long lastIoTime) {
-        SelectionKey key = session.getSelectionKey();
-        if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
-                && key != null && key.isValid()
-                && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
-            session.getFilterChain().fireExceptionCaught(session,
-                    new WriteTimeoutException());
-        }
+    private SocketChannel getChannel(IoSession session) {
+        return ((SocketSessionImpl) session).getChannel();
     }
-
-    private void doFlush() {
-        if (flushingSessions.size() == 0) {
-            return;
-        }
-
-        for (; ;) {
-            SocketSessionImpl session = flushingSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            session.setScheduledForFlush(false);
-
-            if (!session.isConnected()) {
-                clearWriteRequestQueue(session);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.write() is called before addSession() is processed)
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-
-            // Skip if the channel is already closed.
-            if (!key.isValid()) {
-                continue;
-            }
-
-            try {
-                boolean flushedAll = doFlush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
-                    scheduleFlush(session);
-                }
-            } catch (IOException e) {
-                scheduleRemove(session);
-                session.getFilterChain().fireExceptionCaught(session, e);
-            }
-        }
+    
+    private SelectionKey getSelectionKey(IoSession session) {
+        return ((SocketSessionImpl) session).getSelectionKey();
     }
-
-    private void clearWriteRequestQueue(SocketSessionImpl session) {
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-        WriteRequest req;
-
-        if ((req = writeRequestQueue.poll()) != null) {
-            Object m = req.getMessage();
-            if (m instanceof ByteBuffer) {
-                ByteBuffer buf = (ByteBuffer) req.getMessage();
-
-                // The first unwritten empty buffer must be
-                // forwarded to the filter chain.
-                if (buf.hasRemaining()) {
-                    req.getFuture().setWritten(false);
-                } else {
-                    session.getFilterChain().fireMessageSent(session, req);
-                }
-            } else {
-                req.getFuture().setWritten(false);
-            }
-
-            // Discard others.
-            while ((req = writeRequestQueue.poll()) != null) {
-                req.getFuture().setWritten(false);
-            }
+    
+    private static class IoSessionIterator implements Iterator<AbstractIoSession> {
+        private final Iterator<SelectionKey> i;
+        private IoSessionIterator(Set<SelectionKey> keys) {
+            i = keys.iterator();
         }
-    }
-
-    private boolean doFlush(SocketSessionImpl session) throws IOException {
-        // Clear OP_WRITE
-        SelectionKey key = session.getSelectionKey();
-        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-        SocketChannel ch = session.getChannel();
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
-        int writtenBytes = 0;
-        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
-        try {
-            do {
-                // Check for pending writes.
-                WriteRequest req = writeRequestQueue.peek();
-
-                if (req == null) {
-                    break;
-                }
-
-                Object message = req.getMessage();
-                if (message instanceof FileRegion) {
-                    FileRegion region = (FileRegion) message;
-
-                    if (region.getCount() <= 0) {
-                        // File has been sent, remove from queue
-                        writeRequestQueue.poll();
-                        session.increaseWrittenMessages();
-                        session.getFilterChain().fireMessageSent(session, req);
-                        continue;
-                    }
-
-                    if (key.isWritable()) {
-                        long localWrittenBytes =
-                                region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
-                        region.setPosition(region.getPosition() + localWrittenBytes);
-                        writtenBytes += localWrittenBytes;
-                    }
-
-                    if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
-                        // Kernel buffer is full or wrote too much.
-                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                        return false;
-                    }
-
-                } else {
-                    ByteBuffer buf = (ByteBuffer) message;
-                    if (buf.remaining() == 0) {
-                        // Buffer has been completely sent, remove request form queue
-                        writeRequestQueue.poll();
-
-                        session.increaseWrittenMessages();
-
-                        buf.reset();
-                        session.getFilterChain().fireMessageSent(session, req);
-                        continue;
-                    }
-
-                    if (key.isWritable()) {
-                        writtenBytes += ch.write(buf.buf());
-                    }
-
-                    if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
-                        // Kernel buffer is full or wrote too much.
-                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                        return false;
-                    }
-                }
-            } while (writtenBytes < maxWrittenBytes);
-        } finally {
-            session.increaseWrittenBytes(writtenBytes);
+        public boolean hasNext() {
+            return i.hasNext();
         }
 
-        return true;
-    }
-
-    private void doUpdateTrafficMask() {
-        for (; ;) {
-            SocketSessionImpl session = trafficControllingSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.suspend??() or session.resume??() is
-            // called before addSession() is processed)
-            if (key == null) {
-                scheduleTrafficControl(session);
-                break;
-            }
-            // skip if channel is already closed
-            if (!key.isValid()) {
-                continue;
-            }
-
-            // The normal is OP_READ and, if there are write requests in the
-            // session's write queue, set OP_WRITE to trigger flushing.
-            int ops = SelectionKey.OP_READ;
-            if (!session.getWriteRequestQueue().isEmpty()) {
-                ops |= SelectionKey.OP_WRITE;
-            }
-
-            // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+        public AbstractIoSession next() {
+            SelectionKey key = i.next();
+            return (AbstractIoSession) key.attachment();
         }
-    }
 
-    private class Worker implements Runnable {
-        public void run() {
-            Thread.currentThread().setName(SocketIoProcessor.this.threadName);
-
-            for (; ;) {
-                try {
-                    int nKeys = selector.select(1000);
-                    doAddNew();
-                    doUpdateTrafficMask();
-
-                    if (nKeys > 0) {
-                        process(selector.selectedKeys());
-                    }
-
-                    doFlush();
-                    doRemove();
-                    notifyIdleness();
-
-                    if (selector.keys().isEmpty()) {
-                        synchronized (lock) {
-                            if (selector.keys().isEmpty() && newSessions.isEmpty()) {
-                                worker = null;
-                                break;
-                            }
-                        }
-                    }
-                } catch (Throwable t) {
-                    ExceptionMonitor.getInstance().exceptionCaught(t);
-
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                        ExceptionMonitor.getInstance().exceptionCaught(e1);
-                    }
-                }
-            }
+        public void remove() {
+            i.remove();
         }
     }
 }