You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/04 12:17:48 UTC
svn commit: r918939 - in
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid:
qmf/QMFService.java server/AMQChannel.java
server/configuration/SessionConfig.java server/transport/ServerSession.java
Author: robbie
Date: Thu Mar 4 11:17:48 2010
New Revision: 918939
URL: http://svn.apache.org/viewvc?rev=918939&view=rev
Log:
QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Thu Mar 4 11:17:48 2010
@@ -1300,26 +1300,22 @@
public Long getTxnStarts()
{
- // TODO
- return 0l;
+ return _obj.getTxnStarts();
}
public Long getTxnCommits()
{
- // TODO
- return 0l;
+ return _obj.getTxnCommits();
}
public Long getTxnRejects()
{
- // TODO
- return 0l;
+ return _obj.getTxnRejects();
}
public Long getTxnCount()
{
- // TODO
- return 0l;
+ return _obj.getTxnCount();
}
public Long getClientCredit()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar 4 11:17:48 2010
@@ -84,6 +84,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel implements SessionConfig
{
@@ -132,6 +133,11 @@
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
+
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+ private final AtomicLong _txnCount = new AtomicLong(0);
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
@@ -180,6 +186,7 @@
public void setLocalTransactional()
{
_transaction = new LocalTransaction(_messageStore);
+ _txnStarts.incrementAndGet();
}
public boolean isTransactional()
@@ -189,6 +196,40 @@
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
+
+ private void incrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
+
+ private void decrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
+
+ public Long getTxnStarts()
+ {
+ return _txnStarts.get();
+ }
+
+ public Long getTxnCommits()
+ {
+ return _txnCommits.get();
+ }
+
+ public Long getTxnRejects()
+ {
+ return _txnRejects.get();
+ }
+
+ public Long getTxnCount()
+ {
+ return _txnCount.get();
+ }
public int getChannelId()
{
@@ -278,7 +319,7 @@
else
{
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
-
+ incrementOutstandingTxnsIfNecessary();
}
}
}
@@ -845,6 +886,9 @@
_transaction.commit();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
public void rollback() throws AMQException
@@ -877,6 +921,10 @@
finally
{
_rollingBack = false;
+
+ _txnRejects.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
postRollbackTask.run();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java Thu Mar 4 11:17:48 2010
@@ -38,4 +38,12 @@
Long getExpiryTime();
Long getMaxClientRate();
+
+ Long getTxnStarts();
+
+ Long getTxnCommits();
+
+ Long getTxnRejects();
+
+ Long getTxnCount();
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=918939&r1=918938&r2=918939&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Mar 4 11:17:48 2010
@@ -61,6 +61,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
public class ServerSession extends Session implements PrincipalHolder, SessionConfig
{
@@ -92,6 +93,11 @@
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
+
+ private final AtomicLong _txnStarts = new AtomicLong(0);
+ private final AtomicLong _txnCommits = new AtomicLong(0);
+ private final AtomicLong _txnRejects = new AtomicLong(0);
+ private final AtomicLong _txnCount = new AtomicLong(0);
private Principal _principal;
@@ -160,7 +166,7 @@
}
});
-
+ incrementOutstandingTxnsIfNecessary();
}
@@ -391,13 +397,56 @@
public void commit()
{
_transaction.commit();
+
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
}
public void rollback()
{
_transaction.rollback();
+
+ _txnRejects.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+
+
+ private void incrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
+
+ private void decrementOutstandingTxnsIfNecessary()
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
+
+ public Long getTxnStarts()
+ {
+ return _txnStarts.get();
}
+ public Long getTxnCommits()
+ {
+ return _txnCommits.get();
+ }
+
+ public Long getTxnRejects()
+ {
+ return _txnRejects.get();
+ }
+
+ public Long getTxnCount()
+ {
+ return _txnCount.get();
+ }
+
public Principal getPrincipal()
{
return _principal;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org