You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/07/28 23:53:35 UTC
svn commit: r560631 - in /mina/trunk/core/src/main/java/org/apache/mina:
common/ common/support/ transport/socket/nio/
Author: mheath
Date: Sat Jul 28 14:53:34 2007
New Revision: 560631
URL: http://svn.apache.org/viewvc?view=rev&rev=560631
Log:
Implemented DIRMINA-218 - Added ability to transfer channels to socket
channels owned by Mina.
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
Added: mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java?view=auto&rev=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java Sat Jul 28 14:53:34 2007
@@ -0,0 +1,48 @@
+package org.apache.mina.common;
+
+import java.nio.channels.FileChannel;
+
+/**
+ * Indicates the region of a file to be sent to the remote host.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 560320 $, $Date: 2007-07-27 11:12:26 -0600 (Fri, 27 Jul 2007) $,
+ */
+public interface SendFileRegion {
+
+ /**
+ * The open <tt>FileChannel<tt> from which data will be read to send to remote host.
+ *
+ * @return An open <tt>FileChannel<tt>.
+ */
+ FileChannel getFileChannel();
+
+ /**
+ * The current file position from which data will be read.
+ *
+ * @return The current file position.
+ */
+ long getPosition();
+
+ /**
+ * Updates the current file position. May not be negative.
+ *
+ * @param value The new value for the file position.
+ */
+ void setPosition(long value);
+
+ /**
+ * The number of bytes to be written from the file to the remote host.
+ *
+ * @return The number of bytes to be written.
+ */
+ long getCount();
+
+ /**
+ * The total number of bytes already written.
+ *
+ * @return The total number of bytes already written.
+ */
+ long getBytesWritten();
+
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/SendFileRegion.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Sat Jul 28 14:53:34 2007
@@ -19,7 +19,11 @@
*/
package org.apache.mina.common.support;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.net.SocketAddress;
+import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +33,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.DefaultWriteRequest;
+import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoService;
@@ -148,6 +153,19 @@
&& !((ByteBuffer) message).hasRemaining()) {
throw new IllegalArgumentException(
"message is empty. Forgot to call flip()?");
+ } else if (message instanceof File || message instanceof FileChannel) {
+ try {
+ FileChannel channel;
+ if (message instanceof File) {
+ File file = (File) message;
+ channel = new FileInputStream(file).getChannel();
+ } else {
+ channel = (FileChannel) message;
+ }
+ message = new DefaultSendFileRegion(channel, 0, channel.size());
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
}
synchronized (lock) {
@@ -407,7 +425,7 @@
idleCountForRead = 0;
}
- public void increaseWrittenBytes(int increment) {
+ public void increaseWrittenBytes(long increment) {
writtenBytes += increment;
lastWriteTime = System.currentTimeMillis();
idleCountForBoth = 0;
Added: mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java?view=auto&rev=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java Sat Jul 28 14:53:34 2007
@@ -0,0 +1,55 @@
+package org.apache.mina.common.support;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.mina.common.SendFileRegion;
+
+public class DefaultSendFileRegion implements SendFileRegion {
+
+ private final FileChannel channel;
+
+ private long originalPosition;
+ private long position;
+ private long count;
+
+ public DefaultSendFileRegion(FileChannel channel, long position, long count) {
+ if (channel == null) {
+ throw new IllegalArgumentException("channel can not be null");
+ }
+ if (position < 0) {
+ throw new IllegalArgumentException("position may not be less than 0");
+ }
+ if (count < 0) {
+ throw new IllegalArgumentException("count may not be less than 0");
+ }
+ this.channel = channel;
+ this.originalPosition = position;
+ this.position = position;
+ this.count = count;
+ }
+
+ public long getBytesWritten() {
+ return position - originalPosition;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public FileChannel getFileChannel() {
+ return channel;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public void setPosition(long value) {
+ if (value < position) {
+ throw new IllegalArgumentException("New position value may not be less than old position value");
+ }
+ count += value - position;
+ position = value;
+ }
+
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/support/DefaultSendFileRegion.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Sat Jul 28 14:53:34 2007
@@ -20,9 +20,7 @@
package org.apache.mina.transport.socket.nio;
import java.io.IOException;
-import java.util.Queue;
-import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
@@ -42,23 +40,7 @@
@Override
protected void doWrite(IoSession session, WriteRequest writeRequest) {
SocketSessionImpl s = (SocketSessionImpl) session;
- Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-
- // SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
-
- int writeRequestQueueSize;
- synchronized (writeRequestQueue) {
- writeRequestQueue.offer(writeRequest);
- writeRequestQueueSize = writeRequestQueue.size();
- }
-
- if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush(s);
- }
-
+ s.queueWriteRequest(writeRequest);
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sat Jul 28 14:53:34 2007
@@ -35,6 +35,7 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.SendFileRegion;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteTimeoutException;
import org.apache.mina.common.support.IoServiceListenerSupport;
@@ -112,19 +113,13 @@
boolean needsWakeup = flushingSessions.isEmpty();
scheduleFlush(session);
if (needsWakeup) {
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
- }
+ selector.wakeup();
}
}
void updateTrafficMask(SocketSessionImpl session) {
scheduleTrafficControl(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
- }
+ selector.wakeup();
}
private void scheduleRemove(SocketSessionImpl session) {
@@ -379,6 +374,7 @@
for (;;) {
WriteRequest req;
+ // Check for pending writes.
synchronized (writeRequestQueue) {
req = writeRequestQueue.peek();
}
@@ -387,30 +383,61 @@
break;
}
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
+ Object message = req.getMessage();
+ if (message instanceof SendFileRegion) {
+ SendFileRegion region = (SendFileRegion) message;
+
+ if (region.getCount() <= 0) {
+ // File has been sent, remove from queue
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+ session.increaseWrittenMessages();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if (key.isWritable()) {
+ long writtenBytes = region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+ region.setPosition(region.getPosition() + writtenBytes);
+
+ if (writtenBytes > 0) {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+ }
+
+ if (region.getCount() > 0) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
}
- session.increaseWrittenMessages();
+ } else {
+ ByteBuffer buf = (ByteBuffer) message;
+ if (buf.remaining() == 0) {
+ // Buffer has been completely sent, remove request form queue
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
+ session.increaseWrittenMessages();
- if (key.isWritable()) {
- int writtenBytes = ch.write(buf.buf());
- if (writtenBytes > 0) {
- session.increaseWrittenBytes(writtenBytes);
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
}
- }
- if (buf.hasRemaining()) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ if (key.isWritable()) {
+ int writtenBytes = ch.write(buf.buf());
+ if (writtenBytes > 0) {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+ }
+
+ if (buf.hasRemaining()) {
+ // Kernel buffer is full
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ }
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=560631&r1=560630&r2=560631
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sat Jul 28 14:53:34 2007
@@ -335,4 +335,23 @@
}
}
}
+
+ void queueWriteRequest(WriteRequest writeRequest) {
+ if (writeRequest.getMessage() instanceof ByteBuffer) {
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ((ByteBuffer) writeRequest.getMessage()).mark();
+ }
+
+ int writeRequestQueueSize;
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.offer(writeRequest);
+ writeRequestQueueSize = writeRequestQueue.size();
+ }
+
+ if (writeRequestQueueSize == 1 && getTrafficMask().isWritable()) {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ getIoProcessor().flush(this);
+ }
+ }
}