You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/01/30 10:53:45 UTC

svn commit: r501360 - in /mina: branches/1.0/core/src/main/java/org/apache/mina/management/ branches/1.1/core/src/main/java/org/apache/mina/management/ trunk/core/src/main/java/org/apache/mina/management/

Author: trustin
Date: Tue Jan 30 01:53:44 2007
New Revision: 501360

URL: http://svn.apache.org/viewvc?view=rev&rev=501360
Log:
Resolved issues:
* DIRMINA-331 (StatCollector is not thread safe and some stats are being mixed up)
* DIRMINA-332 (Improve performance of StatCollector)
** Applied Gaston's patch (partially modified to keep backward compatibility)


Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/management/IoSessionStat.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/management/StatCollector.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/management/IoSessionStat.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/management/StatCollector.java
    mina/trunk/core/src/main/java/org/apache/mina/management/IoSessionStat.java
    mina/trunk/core/src/main/java/org/apache/mina/management/StatCollector.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/management/IoSessionStat.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/management/IoSessionStat.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/management/IoSessionStat.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/management/IoSessionStat.java Tue Jan 30 01:53:44 2007
@@ -19,6 +19,8 @@
  */
 package org.apache.mina.management;
 
+import org.apache.mina.common.IoSession;
+
 /**
  * The collected stats for a session. It's used by {@link StatCollector} to attach
  * throughput stats to an {@link IoSession}. You can accces a session stat using 
@@ -45,7 +47,7 @@
     float messageReadThroughput = 0;
 
     // last time the session was polled
-    long lastPollingTime;
+    long lastPollingTime = System.currentTimeMillis();
     
     /**
      * Bytes read per second  

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/management/StatCollector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/management/StatCollector.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/management/StatCollector.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/management/StatCollector.java Tue Jan 30 01:53:44 2007
@@ -21,9 +21,7 @@
 
 
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -31,6 +29,10 @@
 import org.apache.mina.common.IoServiceListener;
 import org.apache.mina.common.IoSession;
 
+import edu.emory.mathcs.backport.java.util.Queue;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Collects statistics of an {@link IoService}. It's polling all the sessions of a given
  * IoService. It's attaching a {@link IoSessionStat} object to all the sessions polled
@@ -66,10 +68,10 @@
     private final IoService service;
     private Worker worker;
     private int pollingInterval = 5000;
-    private List polledSessions;
+    private Queue polledSessions;
 
     // resume of session stats, for simplifying acces to the statistics 
-    private long totalProcessedSessions = 0L;
+    private AtomicLong totalProcessedSessions = new AtomicLong();
     private float msgWrittenThroughput = 0f;
     private float msgReadThroughput = 0f;
     private float bytesWrittenThroughput = 0f;
@@ -131,7 +133,7 @@
     
             // add all current sessions
     
-            polledSessions = new ArrayList();
+            polledSessions = new ConcurrentLinkedQueue();
     
             for ( Iterator iter = service.getManagedServiceAddresses().iterator(); iter.hasNext(); )
             {
@@ -202,36 +204,32 @@
 
     private void addSession( IoSession session )
     {
-        synchronized (this) 
-        {
-            totalProcessedSessions += 1;
-            polledSessions.add( session );
-            IoSessionStat sessionStats = new IoSessionStat();
-            sessionStats.lastPollingTime = System.currentTimeMillis();
-            session.setAttribute( KEY, sessionStats );
-        }
+        IoSessionStat sessionStats = new IoSessionStat();
+        session.setAttribute( KEY, sessionStats );
+        totalProcessedSessions.incrementAndGet();
+        polledSessions.add( session );
     }
 
     private void removeSession( IoSession session )
     {
-        synchronized (this)
-        {
-            // remove the session from the list of polled sessions
-            polledSessions.remove( session );
-            
-            // add the bytes processed between last polling and session closing
-            // prevent non seen byte with non-connected protocols like HTTP and datagrams
-            IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
-            
-            // computing with time between polling and closing
-            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-                        
-            session.removeAttribute( KEY );
+        // remove the session from the list of polled sessions
+        polledSessions.remove( session );
+        
+        // add the bytes processed between last polling and session closing
+        // prevent non seen byte with non-connected protocols like HTTP and datagrams
+        IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
         
+        // computing with time between polling and closing
+        long currentTime = System.currentTimeMillis();
+        synchronized( this )
+        {
+            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
         }
+                    
+        session.removeAttribute( KEY );
     }
 
 
@@ -241,7 +239,7 @@
      */
     public long getTotalProcessedSessions()
     {
-        return totalProcessedSessions;
+        return totalProcessedSessions.get();
     }
     
     public float getBytesReadThroughput()
@@ -331,12 +329,14 @@
                         / ( pollingInterval / 1000f );
                     tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
 
-                    msgWrittenThroughput = tmpMsgWrittenThroughput;
-                    msgReadThroughput = tmpMsgReadThroughput;
-                    bytesWrittenThroughput = tmpBytesWrittenThroughput;
-                    bytesReadThroughput = tmpBytesReadThroughput;
-                    
-                    sessStat.lastPollingTime = System.currentTimeMillis();
+                    synchronized( StatCollector.this )
+                    {
+                        msgWrittenThroughput = tmpMsgWrittenThroughput;
+                        msgReadThroughput = tmpMsgReadThroughput;
+                        bytesWrittenThroughput = tmpBytesWrittenThroughput;
+                        bytesReadThroughput = tmpBytesReadThroughput;
+                        sessStat.lastPollingTime = System.currentTimeMillis();
+                    }
                 }
             }
         }

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/management/IoSessionStat.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/management/IoSessionStat.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/management/IoSessionStat.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/management/IoSessionStat.java Tue Jan 30 01:53:44 2007
@@ -19,6 +19,8 @@
  */
 package org.apache.mina.management;
 
+import org.apache.mina.common.IoSession;
+
 /**
  * The collected stats for a session. It's used by {@link StatCollector} to attach
  * throughput stats to an {@link IoSession}. You can accces a session stat using 
@@ -45,7 +47,7 @@
     float messageReadThroughput = 0;
 
     //  last time the session was polled
-    long lastPollingTime;
+    long lastPollingTime = System.currentTimeMillis();
     
     /**
      * Bytes read per second  

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/management/StatCollector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/management/StatCollector.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/management/StatCollector.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/management/StatCollector.java Tue Jan 30 01:53:44 2007
@@ -21,9 +21,10 @@
 
 
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -66,10 +67,10 @@
     private final IoService service;
     private Worker worker;
     private int pollingInterval = 5000;
-    private List<IoSession> polledSessions;
+    private Queue<IoSession> polledSessions;
 
     // resume of session stats, for simplifying acces to the statistics 
-    private long totalProcessedSessions = 0L;
+    private AtomicLong totalProcessedSessions = new AtomicLong();
     private float msgWrittenThroughput = 0f;
     private float msgReadThroughput = 0f;
     private float bytesWrittenThroughput = 0f;
@@ -131,7 +132,7 @@
     
             // add all current sessions
     
-            polledSessions = new ArrayList<IoSession>();
+            polledSessions = new ConcurrentLinkedQueue<IoSession>();
     
             for ( Iterator<SocketAddress> iter = service.getManagedServiceAddresses().iterator(); iter.hasNext(); )
             {
@@ -202,36 +203,32 @@
 
     private void addSession( IoSession session )
     {
-        synchronized (this) 
-        {
-            totalProcessedSessions += 1;
-            polledSessions.add( session );
-            IoSessionStat sessionStats = new IoSessionStat();
-            sessionStats.lastPollingTime = System.currentTimeMillis();
-            session.setAttribute( KEY, sessionStats );
-        }
+        IoSessionStat sessionStats = new IoSessionStat();
+        session.setAttribute( KEY, sessionStats );
+        totalProcessedSessions.incrementAndGet();
+        polledSessions.add( session );
     }
 
     private void removeSession( IoSession session )
     {
-        synchronized (this)
-        {
-            // remove the session from the list of polled sessions
-            polledSessions.remove( session );
-            
-            // add the bytes processed between last polling and session closing
-            // prevent non seen byte with non-connected protocols like HTTP and datagrams
-            IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
-            
-            // computing with time between polling and closing
-            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-                        
-            session.removeAttribute( KEY );
+        // remove the session from the list of polled sessions
+        polledSessions.remove( session );
         
+        // add the bytes processed between last polling and session closing
+        // prevent non seen byte with non-connected protocols like HTTP and datagrams
+        IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
+        
+        // computing with time between polling and closing
+        long currentTime = System.currentTimeMillis();
+        synchronized( this )
+        {
+            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
         }
+                    
+        session.removeAttribute( KEY );
     }
 
 
@@ -241,7 +238,7 @@
      */
     public long getTotalProcessedSessions()
     {
-        return totalProcessedSessions;
+        return totalProcessedSessions.get();
     }
     
     public float getBytesReadThroughput()
@@ -331,12 +328,14 @@
                         / ( pollingInterval / 1000f );
                     tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
 
-                    msgWrittenThroughput = tmpMsgWrittenThroughput;
-                    msgReadThroughput = tmpMsgReadThroughput;
-                    bytesWrittenThroughput = tmpBytesWrittenThroughput;
-                    bytesReadThroughput = tmpBytesReadThroughput;
-                    
-                    sessStat.lastPollingTime = System.currentTimeMillis();
+                    synchronized( StatCollector.this )
+                    {
+                        msgWrittenThroughput = tmpMsgWrittenThroughput;
+                        msgReadThroughput = tmpMsgReadThroughput;
+                        bytesWrittenThroughput = tmpBytesWrittenThroughput;
+                        bytesReadThroughput = tmpBytesReadThroughput;
+                        sessStat.lastPollingTime = System.currentTimeMillis();
+                    }
                 }
             }
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/management/IoSessionStat.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/management/IoSessionStat.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/management/IoSessionStat.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/management/IoSessionStat.java Tue Jan 30 01:53:44 2007
@@ -47,7 +47,7 @@
     float messageReadThroughput = 0;
     
     // last time the session was polled
-    long lastPollingTime;
+    long lastPollingTime = System.currentTimeMillis();
     
     /**
      * Bytes read per second  

Modified: mina/trunk/core/src/main/java/org/apache/mina/management/StatCollector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/management/StatCollector.java?view=diff&rev=501360&r1=501359&r2=501360
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/management/StatCollector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/management/StatCollector.java Tue Jan 30 01:53:44 2007
@@ -20,9 +20,10 @@
 package org.apache.mina.management;
 
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceListener;
@@ -63,10 +64,10 @@
     private final IoService service;
     private Worker worker;
     private int pollingInterval = 5000;
-    private List<IoSession> polledSessions;
+    private Queue<IoSession> polledSessions;
 
     // resume of session stats, for simplifying acces to the statistics 
-    private long totalProcessedSessions = 0L;
+    private AtomicLong totalProcessedSessions = new AtomicLong();
     private float msgWrittenThroughput = 0f;
     private float msgReadThroughput = 0f;
     private float bytesWrittenThroughput = 0f;
@@ -126,7 +127,7 @@
     
             // add all current sessions
     
-            polledSessions = new ArrayList<IoSession>();
+            polledSessions = new ConcurrentLinkedQueue<IoSession>();
             
             for ( Iterator<IoSession> iter = service.getManagedSessions().iterator(); iter.hasNext(); )
             {
@@ -191,36 +192,32 @@
 
     private void addSession( IoSession session )
     {
-        synchronized (this) 
-        {
-            totalProcessedSessions += 1;
-            polledSessions.add( session );
-            IoSessionStat sessionStats = new IoSessionStat();
-            sessionStats.lastPollingTime = System.currentTimeMillis();
-            session.setAttribute( KEY, sessionStats );
-        }
+        IoSessionStat sessionStats = new IoSessionStat();
+        session.setAttribute( KEY, sessionStats );
+        totalProcessedSessions.incrementAndGet();
+        polledSessions.add( session );
     }
 
     private void removeSession( IoSession session )
     {
-        synchronized (this)
-        {
-            // remove the session from the list of polled sessions
-            polledSessions.remove( session );
-            
-            // add the bytes processed between last polling and session closing
-            // prevent non seen byte with non-connected protocols like HTTP and datagrams
-            IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
-            
-            // computing with time between polling and closing
-            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( System.currentTimeMillis() - sessStat.lastPollingTime ) /1000f ) ;
-                        
-            session.removeAttribute( KEY );
+        // remove the session from the list of polled sessions
+        polledSessions.remove( session );
+        
+        // add the bytes processed between last polling and session closing
+        // prevent non seen byte with non-connected protocols like HTTP and datagrams
+        IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
         
+        // computing with time between polling and closing
+        long currentTime = System.currentTimeMillis();
+        synchronized( this )
+        {
+            bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
+            msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) /  ( ( currentTime - sessStat.lastPollingTime ) /1000f ) ;
         }
+                    
+        session.removeAttribute( KEY );
     }
 
 
@@ -230,7 +227,7 @@
      */
     public long getTotalProcessedSessions()
     {
-        return totalProcessedSessions;
+        return totalProcessedSessions.get();
     }
     
     public float getBytesReadThroughput()
@@ -320,12 +317,14 @@
                         / ( pollingInterval / 1000f );
                     tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
 
-                    msgWrittenThroughput = tmpMsgWrittenThroughput;
-                    msgReadThroughput = tmpMsgReadThroughput;
-                    bytesWrittenThroughput = tmpBytesWrittenThroughput;
-                    bytesReadThroughput = tmpBytesReadThroughput;
-                    
-                    sessStat.lastPollingTime = System.currentTimeMillis();
+                    synchronized( StatCollector.this )
+                    {
+                        msgWrittenThroughput = tmpMsgWrittenThroughput;
+                        msgReadThroughput = tmpMsgReadThroughput;
+                        bytesWrittenThroughput = tmpBytesWrittenThroughput;
+                        bytesReadThroughput = tmpBytesReadThroughput;
+                        sessStat.lastPollingTime = System.currentTimeMillis();
+                    }
                 }
             }
         }