You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/28 11:13:50 UTC

svn commit: r598935 - in /mina/trunk: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/transport/socket/nio/ transport-apr/src/main/java/org/apache/mina/transport/socket/apr/

Author: trustin
Date: Wed Nov 28 02:13:44 2007
New Revision: 598935

URL: http://svn.apache.org/viewvc?rev=598935&view=rev
Log:
* Fixed a possible problem related with write operations in datagram transport.
* Fixed a problem that throughput is not updated.


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
    mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Wed Nov 28 02:13:44 2007
@@ -40,12 +40,17 @@
                 AbstractIoService s = (AbstractIoService) service;
                 s.setLastReadTime(s.getActivationTime());
                 s.setLastWriteTime(s.getActivationTime());
+                s.lastThroughputCalculationTime = s.getActivationTime();
                 
                 // Start idleness notification.
                 IdleStatusChecker.getInstance().addService(s);
             }
 
-            public void serviceDeactivated(IoService service) {}
+            public void serviceDeactivated(IoService service) {
+                IdleStatusChecker.getInstance().removeService(
+                        (AbstractIoService) service);
+            }
+
             public void serviceIdle(IoService service, IdleStatus idleStatus) {}
             public void sessionCreated(IoSession session) {}
             public void sessionDestroyed(IoSession session) {}

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java Wed Nov 28 02:13:44 2007
@@ -130,7 +130,6 @@
     protected abstract Iterator<T> selectedSessions();
     protected abstract SessionState state(T session);
 
-
     /**
      * Is the session ready for writing
      * @param session the session queried
@@ -176,14 +175,10 @@
     protected abstract boolean isInterestedInWrite(T session);
 
     protected abstract void init(T session) throws Exception;
-
     protected abstract void destroy(T session) throws Exception;
-
     protected abstract int read(T session, IoBuffer buf) throws Exception;
-
     protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
-
-    protected abstract long transferFile(T session, FileRegion region, int length) throws Exception;
+    protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
 
     public final void add(T session) {
         if (isDisposing()) {
@@ -399,12 +394,15 @@
         IoSessionConfig config = session.getConfig();
         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
 
+        final boolean hasFragmentation =
+            session.getTransportMetadata().hasFragmentation();
+
         try {
             int readBytes = 0;
             int ret;
 
             try {
-                if (session.getTransportMetadata().hasFragmentation()) {
+                if (hasFragmentation) {
                     while ((ret = read(session, buf)) > 0) {
                         readBytes += ret;
                         if (!buf.hasRemaining()) {
@@ -425,7 +423,7 @@
                 session.getFilterChain().fireMessageReceived(buf);
                 buf = null;
 
-                if (session.getTransportMetadata().hasFragmentation()) {
+                if (hasFragmentation) {
                     if (readBytes << 1 < config.getReadBufferSize()) {
                         session.decreaseReadBufferSize();
                     } else if (readBytes == config.getReadBufferSize()) {
@@ -497,6 +495,9 @@
             scheduleRemove(session);
             return false;
         }
+        
+        final boolean hasFragmentation = 
+            session.getTransportMetadata().hasFragmentation();
 
         try {
             // Clear OP_WRITE
@@ -510,7 +511,6 @@
             int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
                                   (session.getConfig().getMaxReadBufferSize() >>> 1);
             int writtenBytes = 0;
-    
             do {
                 // Check for pending writes.
                 WriteRequest req = session.getCurrentWriteRequest();
@@ -522,41 +522,56 @@
                     session.setCurrentWriteRequest(req);
                 }
     
-                long localWrittenBytes = 0;
+                int localWrittenBytes = 0;
                 Object message = req.getMessage();
                 if (message instanceof FileRegion) {
                     FileRegion region = (FileRegion) message;
-    
-                    if (region.getCount() <= 0) {
+                    if (region.getCount() > 0) {
+                        int length;
+                        if (hasFragmentation) {
+                            length = (int) Math.min(
+                                    region.getCount(), 
+                                    maxWrittenBytes - writtenBytes);
+                        } else {
+                            length = (int) Math.min(
+                                    Integer.MAX_VALUE, region.getCount());
+                        }
+                        localWrittenBytes = transferFile(session, region, length);
+                        region.setPosition(region.getPosition() + localWrittenBytes);
+                    }
+
+                    if (region.getCount() <= 0 ||
+                            (!hasFragmentation && localWrittenBytes != 0)) {
                         // File has been sent, clear the current request.
                         session.setCurrentWriteRequest(null);
                         session.getFilterChain().fireMessageSent(req);
-                        continue;
                     }
-    
-                    localWrittenBytes = transferFile(
-                            session, region, (int) Math.min(
-                                    region.getCount(), 
-                                    maxWrittenBytes - writtenBytes));
-                    region.setPosition(region.getPosition() + localWrittenBytes);
                 } else {
                     IoBuffer buf = (IoBuffer) message;
-                    if (buf.remaining() == 0) {
+                    if (buf.hasRemaining()) {
+                        for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
+                            int length;
+                            if (hasFragmentation) {
+                                length = Math.min(
+                                        buf.remaining(), 
+                                        maxWrittenBytes - writtenBytes);
+                            } else {
+                                length = buf.remaining();
+                            }
+    
+                            localWrittenBytes = write(session, buf, length);
+                            if (localWrittenBytes != 0 || !buf.hasRemaining()) {
+                                break;
+                            }
+                        }
+                    }
+
+                    if (!buf.hasRemaining() ||
+                            (!hasFragmentation && localWrittenBytes != 0)) {
                         // Buffer has been sent, clear the current request.
                         session.setCurrentWriteRequest(null);
                         buf.reset();
                         session.getFilterChain().fireMessageSent(req);
-                        continue;
-                    }
-    
-                    for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
-                        localWrittenBytes = write(
-                                session, buf, Math.min(
-                                        buf.remaining(),
-                                        maxWrittenBytes - writtenBytes));
-                        if (localWrittenBytes != 0 || !buf.hasRemaining()) {
-                            break;
-                        }
                     }
                 }
                 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Wed Nov 28 02:13:44 2007
@@ -49,8 +49,6 @@
     private final Object lock = new Object();
     private final Runnable notifyingTask = new NamePreservingRunnable(
             new NotifyingTask(), "IdleStatusChecker");
-    private final IoServiceListener serviceDeactivationListener = 
-        new ServiceDeactivationListener();
     private final IoFutureListener<IoFuture> sessionCloseListener =
         new SessionCloseListener();
     private volatile ScheduledExecutorService executor;
@@ -87,8 +85,6 @@
                 start();
             }
         }
-        
-        service.addListener(serviceDeactivationListener);
     }
 
     public void removeSession(AbstractIoSession session) {
@@ -113,7 +109,7 @@
         ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
         this.executor = executor;
         executor.scheduleWithFixedDelay(
-                notifyingTask, 1000, 1000, TimeUnit.SECONDS);
+                notifyingTask, 1000, 1000, TimeUnit.MILLISECONDS);
     }
     
     private void stop() {
@@ -128,38 +124,29 @@
     private class NotifyingTask implements Runnable {
         public void run() {
             long currentTime = System.currentTimeMillis();
+            notifyServices(currentTime);
+            notifySessions(currentTime);
+        }
 
-            synchronized (sessions) {
-                Iterator<AbstractIoSession> it = sessions.iterator();
-                while (it.hasNext()) {
-                    AbstractIoSession session = it.next();
-                    if (session.isConnected()) {
-                        notifyIdleSession(session, currentTime);
-                    }
+        private void notifyServices(long currentTime) {
+            Iterator<AbstractIoService> it = services.iterator();
+            while (it.hasNext()) {
+                AbstractIoService service = it.next();
+                if (service.isActive()) {
+                    notifyIdleness(service, currentTime, false);
                 }
             }
-            
-            synchronized (services) {
-                Iterator<AbstractIoService> it = services.iterator();
-                while (it.hasNext()) {
-                    AbstractIoService service = it.next();
-                    if (service.isActive()) {
-                        notifyIdleness(service, currentTime, false);
-                    }
+        }
+
+        private void notifySessions(long currentTime) {
+            Iterator<AbstractIoSession> it = sessions.iterator();
+            while (it.hasNext()) {
+                AbstractIoSession session = it.next();
+                if (session.isConnected()) {
+                    notifyIdleSession(session, currentTime);
                 }
             }
         }
-    }
-    
-    private class ServiceDeactivationListener implements IoServiceListener {
-        public void serviceDeactivated(IoService service) {
-            removeService((AbstractIoService) service);
-        }
-
-        public void serviceActivated(IoService service) {}
-        public void serviceIdle(IoService service, IdleStatus idleStatus) {}
-        public void sessionCreated(IoSession session) {}
-        public void sessionDestroyed(IoSession session) {}
     }
     
     private class SessionCloseListener implements IoFutureListener<IoFuture> {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java Wed Nov 28 02:13:44 2007
@@ -172,8 +172,8 @@
     }
 
     @Override
-    protected long transferFile(NioSession session, FileRegion region, int length) throws Exception {
-        return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), session.getChannel());
+    protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
+        return (int) region.getFileChannel().transferTo(region.getPosition(), region.getCount(), session.getChannel());
     }
 
     protected static class IoSessionIterator implements Iterator<NioSession> {

Modified: mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=598935&r1=598934&r2=598935&view=diff
==============================================================================
--- mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java (original)
+++ mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java Wed Nov 28 02:13:44 2007
@@ -357,7 +357,7 @@
     }
 
     @Override
-    protected long transferFile(AprSession session, FileRegion region, int length)
+    protected int transferFile(AprSession session, FileRegion region, int length)
             throws Exception {
         throw new UnsupportedOperationException();
     }