You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/04 12:17:48 UTC

svn commit: r918939 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid: qmf/QMFService.java server/AMQChannel.java server/configuration/SessionConfig.java server/transport/ServerSession.java

Author: robbie
Date: Thu Mar  4 11:17:48 2010
New Revision: 918939

URL: http://svn.apache.org/viewvc?rev=918939&view=rev
Log:
QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Thu Mar  4 11:17:48 2010
@@ -1300,26 +1300,22 @@
 
         public Long getTxnStarts()
         {
-            // TODO
-            return 0l;
+            return _obj.getTxnStarts();
         }
 
         public Long getTxnCommits()
         {
-            // TODO
-            return 0l;
+            return _obj.getTxnCommits();
         }
 
         public Long getTxnRejects()
         {
-            // TODO
-            return 0l;
+            return _obj.getTxnRejects();
         }
 
         public Long getTxnCount()
         {
-            // TODO
-            return 0l;
+            return _obj.getTxnCount();
         }
 
         public Long getClientCredit()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar  4 11:17:48 2010
@@ -84,6 +84,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQChannel implements SessionConfig
 {
@@ -132,6 +133,11 @@
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
     private ServerTransaction _transaction;
+    
+    private final AtomicLong _txnStarts = new AtomicLong(0);
+    private final AtomicLong _txnCommits = new AtomicLong(0);
+    private final AtomicLong _txnRejects = new AtomicLong(0);
+    private final AtomicLong _txnCount = new AtomicLong(0);
 
     // Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;
@@ -180,6 +186,7 @@
     public void setLocalTransactional()
     {
         _transaction = new LocalTransaction(_messageStore);
+        _txnStarts.incrementAndGet();
     }
 
     public boolean isTransactional()
@@ -189,6 +196,40 @@
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
+    
+    private void incrementOutstandingTxnsIfNecessary()
+    {
+        //There can currently only be at most one outstanding transaction
+        //due to only having LocalTransaction support. Set value to 1 if 0.
+        _txnCount.compareAndSet(0,1);
+    }
+    
+    private void decrementOutstandingTxnsIfNecessary()
+    {
+        //There can currently only be at most one outstanding transaction
+        //due to only having LocalTransaction support. Set value to 0 if 1.
+        _txnCount.compareAndSet(1,0);
+    }
+
+    public Long getTxnStarts()
+    {
+        return _txnStarts.get();
+    }
+
+    public Long getTxnCommits()
+    {
+        return _txnCommits.get();
+    }
+
+    public Long getTxnRejects()
+    {
+        return _txnRejects.get();
+    }
+
+    public Long getTxnCount()
+    {
+        return _txnCount.get();
+    }
 
     public int getChannelId()
     {
@@ -278,7 +319,7 @@
                     else
                     {
                         _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
-
+                        incrementOutstandingTxnsIfNecessary();
                     }
                 }
             }
@@ -845,6 +886,9 @@
 
         _transaction.commit();
 
+        _txnCommits.incrementAndGet();
+        _txnStarts.incrementAndGet();
+        decrementOutstandingTxnsIfNecessary();
     }
 
     public void rollback() throws AMQException
@@ -877,6 +921,10 @@
         finally
         {
             _rollingBack = false;
+            
+            _txnRejects.incrementAndGet();
+            _txnStarts.incrementAndGet();
+            decrementOutstandingTxnsIfNecessary();
         }
 
         postRollbackTask.run();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java Thu Mar  4 11:17:48 2010
@@ -38,4 +38,12 @@
     Long getExpiryTime();
 
     Long getMaxClientRate();
+    
+    Long getTxnStarts();
+
+    Long getTxnCommits();
+
+    Long getTxnRejects();
+
+    Long getTxnCount();
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Mar  4 11:17:48 2010
@@ -61,6 +61,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ServerSession extends Session implements PrincipalHolder, SessionConfig
 {
@@ -92,6 +93,11 @@
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
     private ServerTransaction _transaction;
+    
+    private final AtomicLong _txnStarts = new AtomicLong(0);
+    private final AtomicLong _txnCommits = new AtomicLong(0);
+    private final AtomicLong _txnRejects = new AtomicLong(0);
+    private final AtomicLong _txnCount = new AtomicLong(0);
 
     private Principal _principal;
 
@@ -160,7 +166,7 @@
                 }
             });
 
-
+            incrementOutstandingTxnsIfNecessary();
     }
 
 
@@ -391,13 +397,56 @@
     public void commit()
     {
         _transaction.commit();
+        
+        _txnCommits.incrementAndGet();
+        _txnStarts.incrementAndGet();
+        decrementOutstandingTxnsIfNecessary();
     }
 
     public void rollback()
     {
         _transaction.rollback();
+        
+        _txnRejects.incrementAndGet();
+        _txnStarts.incrementAndGet();
+        decrementOutstandingTxnsIfNecessary();
+    }
+
+    
+    private void incrementOutstandingTxnsIfNecessary()
+    {
+        //There can currently only be at most one outstanding transaction
+        //due to only having LocalTransaction support. Set value to 1 if 0.
+        _txnCount.compareAndSet(0,1);
+    }
+    
+    private void decrementOutstandingTxnsIfNecessary()
+    {
+        //There can currently only be at most one outstanding transaction
+        //due to only having LocalTransaction support. Set value to 0 if 1.
+        _txnCount.compareAndSet(1,0);
+    }
+
+    public Long getTxnStarts()
+    {
+        return _txnStarts.get();
     }
 
+    public Long getTxnCommits()
+    {
+        return _txnCommits.get();
+    }
+
+    public Long getTxnRejects()
+    {
+        return _txnRejects.get();
+    }
+
+    public Long getTxnCount()
+    {
+        return _txnCount.get();
+    }
+    
     public Principal getPrincipal()
     {
         return _principal;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org