You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/07/28 23:53:35 UTC

svn commit: r560631 - in /mina/trunk/core/src/main/java/org/apache/mina: common/ common/support/ transport/socket/nio/

Author: mheath
Date: Sat Jul 28 14:53:34 2007
New Revision: 560631

URL: http://svn.apache.org/viewvc?view=rev&rev=560631
Log:
Implemented DIRMINA-218 - Added ability to transfer channels to socket 
channels owned by Mina.

Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java

Added: mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java?view=auto&rev=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java Sat Jul 28 14:53:34 2007
@@ -0,0 +1,48 @@
+package org.apache.mina.common;
+
+import java.nio.channels.FileChannel;
+
+/**
+ * Indicates the region of a file to be sent to the remote host.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 560320 $, $Date: 2007-07-27 11:12:26 -0600 (Fri, 27 Jul 2007) $,
+ */
+public interface SendFileRegion {
+
+    /**
+     * The open <tt>FileChannel<tt> from which data will be read to send to remote host. 
+     * 
+     * @return  An open <tt>FileChannel<tt>.
+     */
+    FileChannel getFileChannel();
+    
+    /**
+     * The current file position from which data will be read. 
+     * 
+     * @return  The current file position.
+     */
+    long getPosition();
+    
+    /**
+     * Updates the current file position.  May not be negative.
+     * 
+     * @param value  The new value for the file position.
+     */
+    void setPosition(long value);
+    
+    /**
+     * The number of bytes to be written from the file to the remote host.
+     * 
+     * @return  The number of bytes to be written.
+     */
+    long getCount();
+    
+    /**
+     * The total number of bytes already written.
+     * 
+     * @return  The total number of bytes already written.
+     */
+    long getBytesWritten();
+    
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Sat Jul 28 14:53:34 2007
@@ -19,7 +19,11 @@
  */
 package org.apache.mina.common.support;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.net.SocketAddress;
+import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +33,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.DefaultWriteRequest;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoService;
@@ -148,6 +153,19 @@
                 && !((ByteBuffer) message).hasRemaining()) {
             throw new IllegalArgumentException(
                     "message is empty. Forgot to call flip()?");
+        } else if (message instanceof File || message instanceof FileChannel) {
+            try {
+                FileChannel channel;
+                if (message instanceof File) {
+                    File file = (File) message;
+                    channel = new FileInputStream(file).getChannel();
+                } else {
+                    channel = (FileChannel) message;
+                }
+                message = new DefaultSendFileRegion(channel, 0, channel.size());
+            } catch (IOException e) {
+                ExceptionMonitor.getInstance().exceptionCaught(e);
+            }
         }
 
         synchronized (lock) {
@@ -407,7 +425,7 @@
         idleCountForRead = 0;
     }
 
-    public void increaseWrittenBytes(int increment) {
+    public void increaseWrittenBytes(long increment) {
         writtenBytes += increment;
         lastWriteTime = System.currentTimeMillis();
         idleCountForBoth = 0;

Added: mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java?view=auto&rev=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java Sat Jul 28 14:53:34 2007
@@ -0,0 +1,55 @@
+package org.apache.mina.common.support;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.mina.common.SendFileRegion;
+
+public class DefaultSendFileRegion implements SendFileRegion {
+
+    private final FileChannel channel;
+    
+    private long originalPosition;
+    private long position;
+    private long count;
+    
+    public DefaultSendFileRegion(FileChannel channel, long position, long count) {
+        if (channel == null) {
+            throw new IllegalArgumentException("channel can not be null");
+        }
+        if (position < 0) {
+            throw new IllegalArgumentException("position may not be less than 0");
+        }
+        if (count < 0) {
+            throw new IllegalArgumentException("count may not be less than 0");
+        }
+        this.channel = channel;
+        this.originalPosition = position;
+        this.position = position;
+        this.count = count;
+    }
+    
+    public long getBytesWritten() {
+        return position - originalPosition;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public FileChannel getFileChannel() {
+        return channel;
+    }
+
+    public long getPosition() {
+        return position;
+    }
+
+    public void setPosition(long value) {
+        if (value < position) {
+            throw new IllegalArgumentException("New position value may not be less than old position value");
+        }
+        count += value - position;
+        position = value;
+    }
+
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Sat Jul 28 14:53:34 2007
@@ -20,9 +20,7 @@
 package org.apache.mina.transport.socket.nio;
 
 import java.io.IOException;
-import java.util.Queue;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
@@ -42,23 +40,7 @@
     @Override
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         SocketSessionImpl s = (SocketSessionImpl) session;
-        Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-
-        // SocketIoProcessor.doFlush() will reset it after write is finished
-        // because the buffer will be passed with messageSent event. 
-        ((ByteBuffer) writeRequest.getMessage()).mark();
-
-        int writeRequestQueueSize;
-        synchronized (writeRequestQueue) {
-            writeRequestQueue.offer(writeRequest);
-            writeRequestQueueSize = writeRequestQueue.size();
-        }
-
-        if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
-            // Notify SocketIoProcessor only when writeRequestQueue was empty.
-            s.getIoProcessor().flush(s);
-        }
-
+        s.queueWriteRequest(writeRequest);
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sat Jul 28 14:53:34 2007
@@ -35,6 +35,7 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.SendFileRegion;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.common.support.IoServiceListenerSupport;
@@ -112,19 +113,13 @@
         boolean needsWakeup = flushingSessions.isEmpty();
         scheduleFlush(session);
         if (needsWakeup) {
-            Selector selector = this.selector;
-            if (selector != null) {
-                selector.wakeup();
-            }
+            selector.wakeup();
         }
     }
 
     void updateTrafficMask(SocketSessionImpl session) {
         scheduleTrafficControl(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
-        }
+        selector.wakeup();
     }
 
     private void scheduleRemove(SocketSessionImpl session) {
@@ -379,6 +374,7 @@
         for (;;) {
             WriteRequest req;
 
+            // Check for pending writes.
             synchronized (writeRequestQueue) {
                 req = writeRequestQueue.peek();
             }
@@ -387,30 +383,61 @@
                 break;
             }
 
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                synchronized (writeRequestQueue) {
-                    writeRequestQueue.poll();
+            Object message = req.getMessage();
+            if (message instanceof SendFileRegion) {
+                SendFileRegion region = (SendFileRegion) message; 
+
+                if (region.getCount() <= 0) {
+                    // File has been sent, remove from queue
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
+                    session.increaseWrittenMessages();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+                
+                if (key.isWritable()) {
+                    long writtenBytes = region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+                    region.setPosition(region.getPosition() + writtenBytes);
+                    
+                    if (writtenBytes > 0) {
+                        session.increaseWrittenBytes(writtenBytes);
+                    }
+                }
+                
+                if (region.getCount() > 0) {
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
                 }
 
-                session.increaseWrittenMessages();
+            } else {
+                ByteBuffer buf = (ByteBuffer) message;
+                if (buf.remaining() == 0) {
+                    // Buffer has been completely sent, remove request form queue
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
 
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
+                    session.increaseWrittenMessages();
 
-            if (key.isWritable()) {
-                int writtenBytes = ch.write(buf.buf());
-                if (writtenBytes > 0) {
-                    session.increaseWrittenBytes(writtenBytes);
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
                 }
-            }
 
-            if (buf.hasRemaining()) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                break;
+                if (key.isWritable()) {
+                    int writtenBytes = ch.write(buf.buf());
+                    if (writtenBytes > 0) {
+                        session.increaseWrittenBytes(writtenBytes);
+                    }
+                }
+
+                if (buf.hasRemaining()) {
+                    // Kernel buffer is full
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                }
             }
         }
     }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sat Jul 28 14:53:34 2007
@@ -335,4 +335,23 @@
             }
         }
     }
+    
+    void queueWriteRequest(WriteRequest writeRequest) {
+        if (writeRequest.getMessage() instanceof ByteBuffer) {
+            // SocketIoProcessor.doFlush() will reset it after write is finished
+            // because the buffer will be passed with messageSent event. 
+            ((ByteBuffer) writeRequest.getMessage()).mark();
+        }
+
+        int writeRequestQueueSize;
+        synchronized (writeRequestQueue) {
+            writeRequestQueue.offer(writeRequest);
+            writeRequestQueueSize = writeRequestQueue.size();
+        }
+
+        if (writeRequestQueueSize == 1 && getTrafficMask().isWritable()) {
+            // Notify SocketIoProcessor only when writeRequestQueue was empty.
+            getIoProcessor().flush(this);
+        }
+    }
 }