You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/30 12:51:54 UTC

svn commit: r580694 - in /mina/trunk/core/src/main/java/org/apache/mina: common/ filter/codec/ transport/socket/nio/ transport/vmpipe/

Author: trustin
Date: Sun Sep 30 03:51:52 2007
New Revision: 580694

URL: http://svn.apache.org/viewvc?rev=580694&view=rev
Log:
* Removed unused classes implemented poorly
* Added IoService.isActive() and replaced IoAcceptor.isBound() with it
* Added IoServiceListenerSupport.isActive(); AbstractIoConnector uses it
* Moved all calls to IoSession.increaseXXX() to DefaultIoFilterChain
** Removed duplicate calls in NioDatagramAcceptor
** Moved IoSession.increaseReadMessages() call from ProtocolCodecFilter to DefaultIoFilterChain because it's more accurate when more than one ProtocolCodecFilter is used.
* Changed all unsafe operations in AbstractIoSession and AbstractIoService 'private'.
* Resolved DIRMINA-317 (add supporting statistical methods to IoService)
** Added getActivationTime() - activationTime is more useful than creationTime
** Added getReadBytes()
** Added getWrittenBytes()
** Added getReadMessages()
** Added getWrittenMessages()
** Added getScheduledWriteBytes()
** Added getScheduledWriteMessages()


Removed:
    mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptorWrapper.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoConnectorWrapper.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java Sun Sep 30 03:51:52 2007
@@ -125,7 +125,7 @@
         getListeners().fireServiceDeactivated();
     }
 
-    public boolean isBound() {
+    public boolean isActive() {
         synchronized (bindLock) {
             return bound;
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Sun Sep 30 03:51:52 2007
@@ -20,6 +20,7 @@
 package org.apache.mina.common;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -43,6 +44,14 @@
      * Maintains the {@link IoServiceListener}s of this service.
      */
     private final IoServiceListenerSupport listeners;
+    
+    private volatile long activationTime;
+    private final AtomicLong readBytes = new AtomicLong();
+    private final AtomicLong writtenBytes = new AtomicLong();
+    private final AtomicLong readMessages = new AtomicLong();
+    private final AtomicLong writtenMessages = new AtomicLong();
+    private final AtomicLong scheduledWriteBytes = new AtomicLong();
+    private final AtomicLong scheduledWriteMessages = new AtomicLong();
 
     /**
      * The default {@link IoSessionConfig} which will be used to configure new sessions.
@@ -93,6 +102,10 @@
         getListeners().remove(listener);
     }
 
+    public boolean isActive() {
+        return getListeners().isActive();
+    }
+
     public Set<IoSession> getManagedSessions() {
         return getListeners().getManagedSessions();
     }
@@ -117,6 +130,64 @@
         return sessionConfig;
     }
     
+    public long getReadBytes() {
+        return readBytes.get();
+    }
+    
+    protected void increaseReadBytes(long increment) {
+        readBytes.addAndGet(increment);
+    }
+
+    public long getReadMessages() {
+        return readMessages.get();
+    }
+    
+    protected void increaseReadMessages() {
+        readMessages.incrementAndGet();
+    }
+
+    public long getScheduledWriteBytes() {
+        return scheduledWriteBytes.get();
+    }
+    
+    protected void increaseScheduledWriteBytes(long increment) {
+        scheduledWriteBytes.addAndGet(increment);
+    }
+
+    public long getScheduledWriteMessages() {
+        return scheduledWriteMessages.get();
+    }
+    
+    protected void increaseScheduledWriteMessages() {
+        scheduledWriteMessages.incrementAndGet();
+    }
+
+    public long getActivationTime() {
+        return activationTime;
+    }
+    
+    protected void setActivationTime(long activationTime) {
+        this.activationTime = activationTime;
+    }
+
+    public long getWrittenBytes() {
+        return writtenBytes.get();
+    }
+    
+    protected void increaseWrittenBytes(long increment) {
+        writtenBytes.addAndGet(increment);
+        scheduledWriteBytes.addAndGet(-increment);
+    }
+
+    public long getWrittenMessages() {
+        return writtenMessages.get();
+    }
+    
+    protected void increaseWrittenMessages() {
+        writtenMessages.incrementAndGet();
+        scheduledWriteMessages.decrementAndGet();
+    }
+
     protected static class ServiceOperationFuture extends DefaultIoFuture {
         public ServiceOperationFuture() {
             super(null);

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 30 03:51:52 2007
@@ -124,11 +124,11 @@
         return closeFuture;
     }
 
-    public boolean isScheduledForFlush() {
+    protected boolean isScheduledForFlush() {
         return scheduledForFlush.get();
     }
 
-    public boolean setScheduledForFlush(boolean flag) {
+    protected boolean setScheduledForFlush(boolean flag) {
         if (flag) {
             return scheduledForFlush.compareAndSet(false, true);
         } else {
@@ -388,16 +388,20 @@
         return scheduledWriteMessages.get();
     }
 
-    public void increaseReadBytes(int increment) {
+    protected void increaseReadBytes(long increment) {
         if (increment > 0) {
             readBytes += increment;
             lastReadTime = System.currentTimeMillis();
             idleCountForBoth = 0;
             idleCountForRead = 0;
+            
+            if (getService() instanceof AbstractIoService) {
+                ((AbstractIoService) getService()).increaseReadBytes(increment);
+            }
         }
     }
 
-    public void increaseWrittenBytes(long increment) {
+    protected void increaseWrittenBytes(long increment) {
         if (increment > 0) {
             writtenBytes += increment;
             lastWriteTime = System.currentTimeMillis();
@@ -405,27 +409,43 @@
             idleCountForWrite = 0;
 
             scheduledWriteBytes.addAndGet(-increment);
+            
+            if (getService() instanceof AbstractIoService) {
+                ((AbstractIoService) getService()).increaseWrittenBytes(increment);
+            }
         }
     }
 
-    public void increaseReadMessages() {
+    protected void increaseReadMessages() {
         readMessages++;
+        if (getService() instanceof AbstractIoService) {
+            ((AbstractIoService) getService()).increaseReadMessages();
+        }
     }
 
-    public void increaseWrittenMessages() {
+    protected void increaseWrittenMessages() {
         writtenMessages++;
         scheduledWriteMessages.decrementAndGet();
+        if (getService() instanceof AbstractIoService) {
+            ((AbstractIoService) getService()).increaseWrittenMessages();
+        }
     }
 
-    public void increaseScheduledWriteBytes(int increment) {
+    protected void increaseScheduledWriteBytes(long increment) {
         scheduledWriteBytes.addAndGet(increment);
+        if (getService() instanceof AbstractIoService) {
+            ((AbstractIoService) getService()).increaseScheduledWriteBytes(increment);
+        }
     }
 
-    public void increaseScheduledWriteMessages() {
+    protected void increaseScheduledWriteMessages() {
         scheduledWriteMessages.incrementAndGet();
+        if (getService() instanceof AbstractIoService) {
+            ((AbstractIoService) getService()).increaseScheduledWriteMessages();
+        }
     }
 
-    public Queue<WriteRequest> getWriteRequestQueue() {
+    protected Queue<WriteRequest> getWriteRequestQueue() {
         return writeRequestQueue;
     }
 
@@ -493,7 +513,7 @@
         throw new IllegalArgumentException("Unknown idle status: " + status);
     }
 
-    public void increaseIdleCount(IdleStatus status) {
+    protected void increaseIdleCount(IdleStatus status) {
         if (status == IdleStatus.BOTH_IDLE) {
             idleCountForBoth++;
             lastIdleTimeForBoth = System.currentTimeMillis();

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Sun Sep 30 03:51:52 2007
@@ -656,6 +656,7 @@
         @Override
         public void messageReceived(NextFilter nextFilter, IoSession session,
                 Object message) throws Exception {
+            ((AbstractIoSession) session).increaseReadMessages();
             session.getHandler().messageReceived(session, message);
         }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java Sun Sep 30 03:51:52 2007
@@ -80,11 +80,6 @@
     void unbind();
 
     /**
-     * Returns <tt>true</tt> if and if only this service is bound to the local address.
-     */
-    boolean isBound();
-
-    /**
      * (Optional) Returns an {@link IoSession} that is bound to the current
      * local address and the specified <tt>remoteAddress</tt> which reuses
      * the local address that is already bound by this service.

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java Sun Sep 30 03:51:52 2007
@@ -101,4 +101,21 @@
      *                               not a {@link DefaultIoFilterChainBuilder}
      */
     DefaultIoFilterChainBuilder getFilterChain();
+    
+    boolean isActive();
+    
+    long getActivationTime();
+    
+    long getReadBytes();
+    
+    long getWrittenBytes();
+    
+    long getReadMessages();
+    
+    long getWrittenMessages();
+    
+    long getScheduledWriteBytes();
+    
+    long getScheduledWriteMessages();
+    
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Sun Sep 30 03:51:52 2007
@@ -84,6 +84,10 @@
     public Set<IoSession> getManagedSessions() {
         return readOnlyManagedSessions;
     }
+    
+    public boolean isActive() {
+        return activated.get();
+    }
 
     /**
      * Calls {@link IoServiceListener#serviceActivated(IoService)}
@@ -93,6 +97,10 @@
         if (!activated.compareAndSet(false, true)) {
             return;
         }
+        
+        if (service instanceof AbstractIoService) {
+            ((AbstractIoService) service).setActivationTime(System.currentTimeMillis());
+        }
 
         for (IoServiceListener l : listeners) {
             l.serviceActivated(service);
@@ -114,6 +122,9 @@
             }
         } finally {
             disconnectSessions();
+            if (service instanceof AbstractIoService) {
+                ((AbstractIoService) service).setActivationTime(0);
+            }
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sun Sep 30 03:51:52 2007
@@ -22,7 +22,6 @@
 import java.net.SocketAddress;
 import java.util.Queue;
 
-import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultWriteFuture;
 import org.apache.mina.common.DefaultWriteRequest;
@@ -342,14 +341,6 @@
                 IoSession session, NextFilter nextFilter) {
             this.session = session;
             this.nextFilter = nextFilter;
-        }
-
-        @Override
-        public void write(Object message) {
-            super.write(message);
-            if (session instanceof AbstractIoSession) {
-                ((AbstractIoSession) session).increaseReadMessages();
-            }
         }
 
         public void flush() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Sun Sep 30 03:51:52 2007
@@ -163,7 +163,7 @@
         }
 
         synchronized (bindLock) {
-            if (!isBound()) {
+            if (!isActive()) {
                 throw new IllegalStateException(
                         "Can't create a session from a unbound service.");
             }
@@ -210,7 +210,7 @@
 
     public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
         synchronized (bindLock) {
-            if (isBound()) {
+            if (isActive()) {
                 throw new IllegalStateException(
                         "sessionRecycler can't be set while the acceptor is bound.");
             }
@@ -349,7 +349,6 @@
             newBuf.put(readBuf);
             newBuf.flip();
 
-            session.increaseReadBytes(newBuf.remaining());
             session.getFilterChain().fireMessageReceived(newBuf);
         }
     }
@@ -391,46 +390,40 @@
 
         int writtenBytes = 0;
         int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
-        try {
-            for (; ;) {
-                WriteRequest req = writeRequestQueue.peek();
-                if (req == null) {
-                    break;
-                }
+        for (; ;) {
+            WriteRequest req = writeRequestQueue.peek();
+            if (req == null) {
+                break;
+            }
 
-                ByteBuffer buf = (ByteBuffer) req.getMessage();
-                if (buf.remaining() == 0) {
-                    // pop and fire event
-                    writeRequestQueue.poll();
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(req);
-                    continue;
-                }
+            ByteBuffer buf = (ByteBuffer) req.getMessage();
+            if (buf.remaining() == 0) {
+                // pop and fire event
+                writeRequestQueue.poll();
+                buf.reset();
+                session.getFilterChain().fireMessageSent(req);
+                continue;
+            }
 
-                SocketAddress destination = req.getDestination();
-                if (destination == null) {
-                    destination = session.getRemoteAddress();
-                }
+            SocketAddress destination = req.getDestination();
+            if (destination == null) {
+                destination = session.getRemoteAddress();
+            }
 
-                int localWrittenBytes = ch.send(buf.buf(), destination);
-                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
-                    // Kernel buffer is full or wrote too much
-                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    return false;
-                } else {
-                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-                    // pop and fire event
-                    writeRequestQueue.poll();
-                    writtenBytes += localWrittenBytes;
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(req);
-                }
+            int localWrittenBytes = ch.send(buf.buf(), destination);
+            if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                // Kernel buffer is full or wrote too much
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                return false;
+            } else {
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+                // pop and fire event
+                writeRequestQueue.poll();
+                writtenBytes += localWrittenBytes;
+                buf.reset();
+                session.getFilterChain().fireMessageSent(req);
             }
-        } finally {
-            session.increaseWrittenBytes(writtenBytes);
         }
 
         return true;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java Sun Sep 30 03:51:52 2007
@@ -24,6 +24,7 @@
 import java.net.SocketException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
+import java.util.Queue;
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultIoFilterChain;
@@ -35,6 +36,7 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIoException;
 import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
 import org.apache.mina.transport.socket.DatagramSession;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -148,6 +150,21 @@
     @Override
     public InetSocketAddress getServiceAddress() {
         return (InetSocketAddress) super.getServiceAddress();
+    }
+
+    @Override
+    protected Queue<WriteRequest> getWriteRequestQueue() {
+        return super.getWriteRequestQueue();
+    }
+
+    @Override
+    protected boolean isScheduledForFlush() {
+        return super.isScheduledForFlush();
+    }
+
+    @Override
+    protected boolean setScheduledForFlush(boolean flag) {
+        return super.setScheduledForFlush(flag);
     }
 
     private class SessionConfigImpl extends AbstractDatagramSessionConfig {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Sun Sep 30 03:51:52 2007
@@ -194,7 +194,7 @@
 
     public void setReuseAddress(boolean reuseAddress) {
         synchronized (bindLock) {
-            if (isBound()) {
+            if (isActive()) {
                 throw new IllegalStateException(
                         "backlog can't be set while the acceptor is bound.");
             }
@@ -209,7 +209,7 @@
 
     public void setBacklog(int backlog) {
         synchronized (bindLock) {
-            if (isBound()) {
+            if (isActive()) {
                 throw new IllegalStateException(
                         "backlog can't be set while the acceptor is bound.");
             }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Sun Sep 30 03:51:52 2007
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.transport.vmpipe;
 
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -33,6 +34,7 @@
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
 
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
@@ -148,6 +150,10 @@
     @Override
     public VmPipeAddress getServiceAddress() {
         return serviceAddress;
+    }
+
+    protected Queue<WriteRequest> getWriteRequestQueue() {
+        return super.getWriteRequestQueue();
     }
 
     Lock getLock() {