You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/05 21:08:54 UTC

svn commit: r1772807 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0: Session_1_0.java TxnCoordinatorReceivingLink_1_0.java

Author: rgodfrey
Date: Mon Dec  5 21:08:54 2016
New Revision: 1772807

URL: http://svn.apache.org/viewvc?rev=1772807&view=rev
Log:
QPID-7546 : implement transaction counting statistics

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772807&r1=1772806&r2=1772807&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Dec  5 21:08:54 2016
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
@@ -176,6 +175,10 @@ public class Session_1_0 implements AMQS
                       "Force detach the link because the session is remotely ended.");
 
     private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
+    private volatile long _startedTransactions;
+    private volatile long _committedTransactions;
+    private volatile long _rolledBackTransactions;
+    private volatile int _unacknowledgedMessages;
 
 
     public Session_1_0(final AMQPConnection_1_0 connection)
@@ -1213,7 +1216,7 @@ public class Session_1_0 implements AMQS
         for(int i = 0; i < txnId.getLength(); i++)
         {
             id <<= 8;
-            id += ((int)data[i+txnId.getArrayOffset()] & 0xff);
+            id |= ((int)data[i+txnId.getArrayOffset()] & 0xff);
         }
 
         return id;
@@ -1456,29 +1459,25 @@ public class Session_1_0 implements AMQS
     @Override
     public int getUnacknowledgedMessageCount()
     {
-        // TODO
-        return 0;
+        return _unacknowledgedMessages;
     }
 
     @Override
     public long getTxnStart()
     {
-        // TODO
-        return 0l;
+        return _startedTransactions;
     }
 
     @Override
     public long getTxnCommits()
     {
-        // TODO
-        return 0l;
+        return _committedTransactions;
     }
 
     @Override
     public long getTxnRejects()
     {
-        // TODO
-        return 0L;
+        return _rolledBackTransactions;
     }
 
     @Override
@@ -1668,6 +1667,31 @@ public class Session_1_0 implements AMQS
         }
     }
 
+    void incrementStartedTransactions()
+    {
+        _startedTransactions++;
+    }
+
+    void incrementCommittedTransactions()
+    {
+        _committedTransactions++;
+    }
+
+    void incrementRolledBackTransactions()
+    {
+        _rolledBackTransactions++;
+    }
+
+    void incrementUnacknowledged()
+    {
+        _unacknowledgedMessages++;
+    }
+
+    void decrementUnacknowledged()
+    {
+        _unacknowledgedMessages--;
+    }
+
     private class ConsumerClosedListener extends AbstractConfigurationChangeListener
     {
         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1772807&r1=1772806&r2=1772807&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Mon Dec  5 21:08:54 2016
@@ -74,7 +74,6 @@ public class TxnCoordinatorReceivingLink
 
     public void messageTransfer(Transfer xfr)
     {
-        // TODO - cope with fragmented messages
 
         QpidByteBuffer payload = null;
 
@@ -115,7 +114,7 @@ public class TxnCoordinatorReceivingLink
             xfr.dispose();
         }
 
-        // Only interested int he amqp-value section that holds the message to the coordinator
+        // Only interested int the amqp-value section that holds the message to the coordinator
         try
         {
             List<Section> sections = _sectionDecoder.parseAll(payload);
@@ -141,7 +140,7 @@ public class TxnCoordinatorReceivingLink
 
                         Declared state = new Declared();
 
-
+                        _session.incrementStartedTransactions();
 
                         state.setTxnId(_session.integerToBinary(txnId));
                         _endpoint.updateDisposition(deliveryTag, state, true);
@@ -176,7 +175,6 @@ public class TxnCoordinatorReceivingLink
 
     public void remoteDetached(LinkEndpoint endpoint, Detach detach)
     {
-        //TODO
         endpoint.detach();
     }
 
@@ -195,10 +193,12 @@ public class TxnCoordinatorReceivingLink
             if(fail)
             {
                 txn.rollback();
+                _session.incrementRolledBackTransactions();
             }
             else
             {
                 txn.commit();
+                _session.incrementCommittedTransactions();
             }
             _openTransactions.remove(transactionId);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org