You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/28 11:13:50 UTC
svn commit: r598935 - in /mina/trunk:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/transport/socket/nio/
transport-apr/src/main/java/org/apache/mina/transport/socket/apr/
Author: trustin
Date: Wed Nov 28 02:13:44 2007
New Revision: 598935
URL: http://svn.apache.org/viewvc?rev=598935&view=rev
Log:
* Fixed a possible problem related with write operations in datagram transport.
* Fixed a problem that throughput is not updated.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Wed Nov 28 02:13:44 2007
@@ -40,12 +40,17 @@
AbstractIoService s = (AbstractIoService) service;
s.setLastReadTime(s.getActivationTime());
s.setLastWriteTime(s.getActivationTime());
+ s.lastThroughputCalculationTime = s.getActivationTime();
// Start idleness notification.
IdleStatusChecker.getInstance().addService(s);
}
- public void serviceDeactivated(IoService service) {}
+ public void serviceDeactivated(IoService service) {
+ IdleStatusChecker.getInstance().removeService(
+ (AbstractIoService) service);
+ }
+
public void serviceIdle(IoService service, IdleStatus idleStatus) {}
public void sessionCreated(IoSession session) {}
public void sessionDestroyed(IoSession session) {}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java Wed Nov 28 02:13:44 2007
@@ -130,7 +130,6 @@
protected abstract Iterator<T> selectedSessions();
protected abstract SessionState state(T session);
-
/**
* Is the session ready for writing
* @param session the session queried
@@ -176,14 +175,10 @@
protected abstract boolean isInterestedInWrite(T session);
protected abstract void init(T session) throws Exception;
-
protected abstract void destroy(T session) throws Exception;
-
protected abstract int read(T session, IoBuffer buf) throws Exception;
-
protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
-
- protected abstract long transferFile(T session, FileRegion region, int length) throws Exception;
+ protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
public final void add(T session) {
if (isDisposing()) {
@@ -399,12 +394,15 @@
IoSessionConfig config = session.getConfig();
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
+ final boolean hasFragmentation =
+ session.getTransportMetadata().hasFragmentation();
+
try {
int readBytes = 0;
int ret;
try {
- if (session.getTransportMetadata().hasFragmentation()) {
+ if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
@@ -425,7 +423,7 @@
session.getFilterChain().fireMessageReceived(buf);
buf = null;
- if (session.getTransportMetadata().hasFragmentation()) {
+ if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
@@ -497,6 +495,9 @@
scheduleRemove(session);
return false;
}
+
+ final boolean hasFragmentation =
+ session.getTransportMetadata().hasFragmentation();
try {
// Clear OP_WRITE
@@ -510,7 +511,6 @@
int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
(session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
-
do {
// Check for pending writes.
WriteRequest req = session.getCurrentWriteRequest();
@@ -522,41 +522,56 @@
session.setCurrentWriteRequest(req);
}
- long localWrittenBytes = 0;
+ int localWrittenBytes = 0;
Object message = req.getMessage();
if (message instanceof FileRegion) {
FileRegion region = (FileRegion) message;
-
- if (region.getCount() <= 0) {
+ if (region.getCount() > 0) {
+ int length;
+ if (hasFragmentation) {
+ length = (int) Math.min(
+ region.getCount(),
+ maxWrittenBytes - writtenBytes);
+ } else {
+ length = (int) Math.min(
+ Integer.MAX_VALUE, region.getCount());
+ }
+ localWrittenBytes = transferFile(session, region, length);
+ region.setPosition(region.getPosition() + localWrittenBytes);
+ }
+
+ if (region.getCount() <= 0 ||
+ (!hasFragmentation && localWrittenBytes != 0)) {
// File has been sent, clear the current request.
session.setCurrentWriteRequest(null);
session.getFilterChain().fireMessageSent(req);
- continue;
}
-
- localWrittenBytes = transferFile(
- session, region, (int) Math.min(
- region.getCount(),
- maxWrittenBytes - writtenBytes));
- region.setPosition(region.getPosition() + localWrittenBytes);
} else {
IoBuffer buf = (IoBuffer) message;
- if (buf.remaining() == 0) {
+ if (buf.hasRemaining()) {
+ for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
+ int length;
+ if (hasFragmentation) {
+ length = Math.min(
+ buf.remaining(),
+ maxWrittenBytes - writtenBytes);
+ } else {
+ length = buf.remaining();
+ }
+
+ localWrittenBytes = write(session, buf, length);
+ if (localWrittenBytes != 0 || !buf.hasRemaining()) {
+ break;
+ }
+ }
+ }
+
+ if (!buf.hasRemaining() ||
+ (!hasFragmentation && localWrittenBytes != 0)) {
// Buffer has been sent, clear the current request.
session.setCurrentWriteRequest(null);
buf.reset();
session.getFilterChain().fireMessageSent(req);
- continue;
- }
-
- for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
- localWrittenBytes = write(
- session, buf, Math.min(
- buf.remaining(),
- maxWrittenBytes - writtenBytes));
- if (localWrittenBytes != 0 || !buf.hasRemaining()) {
- break;
- }
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Wed Nov 28 02:13:44 2007
@@ -49,8 +49,6 @@
private final Object lock = new Object();
private final Runnable notifyingTask = new NamePreservingRunnable(
new NotifyingTask(), "IdleStatusChecker");
- private final IoServiceListener serviceDeactivationListener =
- new ServiceDeactivationListener();
private final IoFutureListener<IoFuture> sessionCloseListener =
new SessionCloseListener();
private volatile ScheduledExecutorService executor;
@@ -87,8 +85,6 @@
start();
}
}
-
- service.addListener(serviceDeactivationListener);
}
public void removeSession(AbstractIoSession session) {
@@ -113,7 +109,7 @@
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
this.executor = executor;
executor.scheduleWithFixedDelay(
- notifyingTask, 1000, 1000, TimeUnit.SECONDS);
+ notifyingTask, 1000, 1000, TimeUnit.MILLISECONDS);
}
private void stop() {
@@ -128,38 +124,29 @@
private class NotifyingTask implements Runnable {
public void run() {
long currentTime = System.currentTimeMillis();
+ notifyServices(currentTime);
+ notifySessions(currentTime);
+ }
- synchronized (sessions) {
- Iterator<AbstractIoSession> it = sessions.iterator();
- while (it.hasNext()) {
- AbstractIoSession session = it.next();
- if (session.isConnected()) {
- notifyIdleSession(session, currentTime);
- }
+ private void notifyServices(long currentTime) {
+ Iterator<AbstractIoService> it = services.iterator();
+ while (it.hasNext()) {
+ AbstractIoService service = it.next();
+ if (service.isActive()) {
+ notifyIdleness(service, currentTime, false);
}
}
-
- synchronized (services) {
- Iterator<AbstractIoService> it = services.iterator();
- while (it.hasNext()) {
- AbstractIoService service = it.next();
- if (service.isActive()) {
- notifyIdleness(service, currentTime, false);
- }
+ }
+
+ private void notifySessions(long currentTime) {
+ Iterator<AbstractIoSession> it = sessions.iterator();
+ while (it.hasNext()) {
+ AbstractIoSession session = it.next();
+ if (session.isConnected()) {
+ notifyIdleSession(session, currentTime);
}
}
}
- }
-
- private class ServiceDeactivationListener implements IoServiceListener {
- public void serviceDeactivated(IoService service) {
- removeService((AbstractIoService) service);
- }
-
- public void serviceActivated(IoService service) {}
- public void serviceIdle(IoService service, IdleStatus idleStatus) {}
- public void sessionCreated(IoSession session) {}
- public void sessionDestroyed(IoSession session) {}
}
private class SessionCloseListener implements IoFutureListener<IoFuture> {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Wed Nov 28 02:13:44 2007
@@ -172,8 +172,8 @@
}
@Override
- protected long transferFile(NioSession session, FileRegion region, int length) throws Exception {
- return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), session.getChannel());
+ protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
+ return (int) region.getFileChannel().transferTo(region.getPosition(), region.getCount(), session.getChannel());
}
protected static class IoSessionIterator implements Iterator<NioSession> {
Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Wed Nov 28 02:13:44 2007
@@ -357,7 +357,7 @@
}
@Override
- protected long transferFile(AprSession session, FileRegion region, int length)
+ protected int transferFile(AprSession session, FileRegion region, int length)
throws Exception {
throw new UnsupportedOperationException();
}