You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/26 16:10:11 UTC

[2/2] qpid-broker-j git commit: QPID-7646: [Broker-J] Improve transaction related statistics

QPID-7646: [Broker-J] Improve transaction related statistics

* Move transaction begin/rollback/open statistics from session into connection
* Add connection statistics oldestTransactionStartTime
* Remove transaction related statistics from session
* Add statistics for transacted message count


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/4e8ecd7b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/4e8ecd7b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/4e8ecd7b

Branch: refs/heads/master
Commit: 4e8ecd7b1fe4e70c165f79dc72d15df1bf8fd0c8
Parents: cb6e719
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Oct 25 15:14:49 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 26 17:05:01 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    |  13 +
 .../apache/qpid/server/model/BrokerImpl.java    |  53 +++-
 .../apache/qpid/server/model/Connection.java    |  31 ++
 .../org/apache/qpid/server/model/Session.java   |  27 --
 .../apache/qpid/server/session/AMQPSession.java |   6 -
 .../server/session/AbstractAMQPSession.java     |  32 --
 .../qpid/server/stats/StatisticsGatherer.java   |   8 +-
 .../qpid/server/transport/AMQPConnection.java   |  16 +
 .../transport/AbstractAMQPConnection.java       | 124 +++++++-
 .../qpid/server/txn/LocalTransaction.java       |  12 +
 .../server/virtualhost/AbstractVirtualHost.java |  54 +++-
 .../virtualhost/QueueManagingVirtualHost.java   |  12 +
 .../protocol/v0_10/AMQPConnection_0_10Impl.java |  13 +
 .../server/protocol/v0_10/ServerSession.java    |  79 ++---
 .../server/protocol/v0_10/Session_0_10.java     |  18 --
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 100 ++----
 .../protocol/v0_8/AMQPConnection_0_8Impl.java   |  16 +
 .../protocol/v1_0/AMQPConnection_1_0.java       |   3 -
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |   8 +-
 .../protocol/v1_0/ConsumerTarget_1_0.java       |   1 +
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  36 ---
 .../v1_0/StandardReceivingLinkEndpoint.java     |  11 +-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  17 +-
 .../src/main/java/resources/css/common.css      |   8 +-
 .../js/qpid/common/StatisticsWidget.js          |   4 +-
 .../resources/js/qpid/management/Connection.js  |  24 --
 .../qpid/systest/rest/ConnectionRestTest.java   | 308 +++++++++----------
 .../qpid/systest/rest/SessionRestTest.java      | 160 ++++++++++
 test-profiles/Java10Excludes                    |   2 +-
 29 files changed, 696 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index e462b83..a10fa55 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -202,6 +202,19 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
                       description = "Total number of messages delivered by the Broker.")
     long getMessagesOut();
 
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+            label = "Transacted Inbound",
+            description = "Total number of messages delivered by the Broker within a transaction.")
+    long getTransactedMessagesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+            label = "Transacted Outbound",
+            description = "Total number of messages received by the Broker within a transaction.")
+    long getTransactedMessagesOut();
+
     @ManagedOperation(nonModifying = true,
             description = "Initiates an orderly shutdown of the Broker.",
             changesConfiguredObjectState = false)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 7c484d3..69212a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -114,7 +114,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
 
     private AuthenticationProvider<?> _managementModeAuthenticationProvider;
 
-    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicLong _messagesIn = new AtomicLong();
+    private final AtomicLong _messagesOut = new AtomicLong();
+    private final AtomicLong _transactedMessagesIn = new AtomicLong();
+    private final AtomicLong _transactedMessagesOut = new AtomicLong();
+    private final AtomicLong _bytesIn = new AtomicLong();
+    private final AtomicLong _bytesOut = new AtomicLong();
 
     @ManagedAttributeField
     private int _statisticsReportingPeriod;
@@ -165,12 +170,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
         final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
         _virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
-        _messagesDelivered = new AtomicLong();
-        _dataDelivered = new AtomicLong();
-        _messagesReceived = new AtomicLong();
-        _dataReceived = new AtomicLong();
-
-
     }
 
     private void registerSystemAddressSpaces()
@@ -809,15 +808,27 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.incrementAndGet();
-        _dataDelivered.addAndGet(messageSize);
+        _messagesOut.incrementAndGet();
+        _bytesOut.addAndGet(messageSize);
+    }
+
+    @Override
+    public void registerTransactedMessageReceived()
+    {
+        _transactedMessagesIn.incrementAndGet();
+    }
+
+    @Override
+    public void registerTransactedMessageDelivered()
+    {
+        _transactedMessagesOut.incrementAndGet();
     }
 
     @Override
     public void registerMessageReceived(long messageSize)
     {
-        _messagesReceived.incrementAndGet();
-        _dataReceived.addAndGet(messageSize);
+        _messagesIn.incrementAndGet();
+        _bytesIn.addAndGet(messageSize);
     }
 
     @Override
@@ -841,25 +852,37 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     @Override
     public long getMessagesIn()
     {
-        return _messagesReceived.get();
+        return _messagesIn.get();
     }
 
     @Override
     public long getBytesIn()
     {
-        return _dataReceived.get();
+        return _bytesIn.get();
     }
 
     @Override
     public long getMessagesOut()
     {
-        return _messagesDelivered.get();
+        return _messagesOut.get();
     }
 
     @Override
     public long getBytesOut()
     {
-        return _dataDelivered.get();
+        return _bytesOut.get();
+    }
+
+    @Override
+    public long getTransactedMessagesIn()
+    {
+        return _transactedMessagesIn.get();
+    }
+
+    @Override
+    public long getTransactedMessagesOut()
+    {
+        return _transactedMessagesOut.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 95db723..14ee17a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -147,6 +147,37 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
                       description = "Current number of sessions belonging to this connection.")
     int getSessionCount();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
+            label = "Transactions", description = "Total number of transactions started.")
+    long getLocalTransactionBegins();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
+            label = "Rolled-back Transactions", description = "Total number of rolled-back transactions.")
+    long getLocalTransactionRollbacks();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT,
+            label = "Open Transactions", description = "Current number of open transactions.")
+    long getLocalTransactionOpen();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME,
+            label= "Oldest transaction start time",
+            description = "The start time of the oldest transaction or null if no transaction is in progress.")
+    Date getOldestTransactionStartTime();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Inbound",
+            description = "Total number of messages delivered by this connection within a transaction.")
+    long getTransactedMessagesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Outbound",
+            description = "Total number of messages received by this connection within a transaction.")
+    long getTransactedMessagesOut();
+
     //children
     Collection<Session> getSessions();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 91f7cdb..ccb396a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.model;
 
-import java.util.Date;
-
 @ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
 public interface Session<X extends Session<X>> extends ConfiguredObject<X>
 {
@@ -58,31 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
     long getConsumerCount();
 
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Transactions")
-    long getLocalTransactionBegins();
-
-    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Open Transactions")
-    int getLocalTransactionOpen();
-
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Rolled-back Transactions")
-    long getLocalTransactionRollbacks();
-
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
     long getUnacknowledgedMessages();
-
-    /**
-     * Return the time the current transaction started.
-     *
-     * @return the time this transaction started or 0 if not in a transaction
-     */
-    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Transaction Start")
-    Date getTransactionStartTime();
-
-    /**
-     * Return the time of the last activity on the current transaction.
-     *
-     * @return the time of the last activity or 0 if not in a transaction
-     */
-    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Transaction Update")
-    Date getTransactionUpdateTime();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 4c24e71..1b224f1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -59,12 +59,6 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
 
     int getUnacknowledgedMessageCount();
 
-    long getTxnStart();
-
-    long getTxnCommits();
-
-    long getTxnRejects();
-
     @Override
     long getConsumerCount();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 075e213..b68ed7f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.session;
 
 import java.security.AccessControlContext;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -170,25 +169,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
     }
 
     @Override
-    public int getLocalTransactionOpen()
-    {
-        long open = getTxnStart() - (getTxnCommits() + getTxnRejects());
-        return (open > 0L) ? 1 : 0;
-    }
-
-    @Override
-    public long getLocalTransactionBegins()
-    {
-        return getTxnStart();
-    }
-
-    @Override
-    public long getLocalTransactionRollbacks()
-    {
-        return getTxnRejects();
-    }
-
-    @Override
     public long getUnacknowledgedMessages()
     {
         return getUnacknowledgedMessageCount();
@@ -206,18 +186,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
         _taskList.remove(task);
     }
 
-    @Override
-    public Date getTransactionStartTime()
-    {
-        return new Date(getTransactionStartTimeLong());
-    }
-
-    @Override
-    public Date getTransactionUpdateTime()
-    {
-        return new Date(getTransactionUpdateTimeLong());
-    }
-
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
index ecc6dbd..72f6513 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -42,7 +42,9 @@ public interface StatisticsGatherer
      * @param messageSize the size in bytes of the delivered message
      */
     void registerMessageReceived(long messageSize);
-    
+
+    void registerTransactedMessageReceived();
+
     /**
      * This method is responsible for registering the delivery of a message
      * with the counters.
@@ -50,7 +52,9 @@ public interface StatisticsGatherer
      * @param messageSize the size in bytes of the delivered message
      */
     void registerMessageDelivered(long messageSize);
-    
+
+    void registerTransactedMessageDelivered();
+
     /**
      * Returns a number of delivered messages
      * 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 2ecef32..6da15b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -24,6 +24,7 @@ import java.net.SocketAddress;
 import java.security.AccessControlContext;
 import java.security.Principal;
 import java.util.Collection;
+import java.util.Iterator;
 
 import javax.security.auth.Subject;
 
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPConnection<C extends AMQPConnection<C>>
@@ -70,6 +72,10 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
     // See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
     void registerMessageDelivered(long size);
 
+    void registerTransactedMessageReceived();
+
+    void registerTransactedMessageDelivered();
+
     void closeSessionAsync(AMQPSession<?,?> session, CloseReason reason, String message);
 
     SocketAddress getRemoteSocketAddress();
@@ -88,6 +94,16 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
 
     LocalTransaction createLocalTransaction();
 
+    void incrementTransactionRollbackCounter();
+
+    void decrementTransactionOpenCounter();
+
+    void incrementTransactionOpenCounter();
+
+    void incrementTransactionBeginCounter();
+
+    Iterator<ServerTransaction> getOpenTransactions();
+
     enum CloseReason
     {
         MANAGEMENT,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 065cc5b..0b16b07 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -30,6 +30,7 @@ import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -73,6 +74,7 @@ import org.apache.qpid.server.transport.network.NetworkConnection;
 import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.FixedKeyMapCreator;
@@ -105,7 +107,17 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
     private String _clientId;
     private volatile boolean _stopped;
-    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+    private final AtomicLong _messagesIn = new AtomicLong();
+    private final AtomicLong _messagesOut = new AtomicLong();
+    private final AtomicLong _transactedMessagesIn = new AtomicLong();
+    private final AtomicLong _transactedMessagesOut = new AtomicLong();
+    private final AtomicLong _bytesIn = new AtomicLong();
+    private final AtomicLong _bytesOut = new AtomicLong();
+    private final AtomicLong _localTransactionBegins = new AtomicLong();
+    private final AtomicLong _localTransactionRollbacks = new AtomicLong();
+    private final AtomicLong _localTransactionOpens = new AtomicLong();
+
     private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
     private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
     private final AtomicBoolean _modelClosing = new AtomicBoolean();
@@ -147,11 +159,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
         updateAccessControllerContext();
 
-        _messagesDelivered = new AtomicLong();
-        _dataDelivered = new AtomicLong();
-        _messagesReceived = new AtomicLong();
-        _dataReceived = new AtomicLong();
-
         _transportClosedFuture.addListener(
                 new Runnable()
                 {
@@ -431,19 +438,33 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.incrementAndGet();
-        _dataDelivered.addAndGet(messageSize);
+        _messagesOut.incrementAndGet();
+        _bytesOut.addAndGet(messageSize);
         _statisticsGatherer.registerMessageDelivered(messageSize);
     }
 
     @Override
     public void registerMessageReceived(long messageSize)
     {
-        _messagesReceived.incrementAndGet();
-        _dataReceived.addAndGet(messageSize);
+        _messagesIn.incrementAndGet();
+        _bytesIn.addAndGet(messageSize);
         _statisticsGatherer.registerMessageReceived(messageSize);
     }
 
+    @Override
+    public void registerTransactedMessageDelivered()
+    {
+        _transactedMessagesOut.incrementAndGet();
+        _statisticsGatherer.registerTransactedMessageDelivered();
+    }
+
+    @Override
+    public void registerTransactedMessageReceived()
+    {
+        _transactedMessagesIn.incrementAndGet();
+        _statisticsGatherer.registerTransactedMessageReceived();
+    }
+
     public void setClientProduct(final String clientProduct)
     {
         _clientProduct = clientProduct;
@@ -631,25 +652,37 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     @Override
     public long getBytesIn()
     {
-        return _dataReceived.get();
+        return _bytesIn.get();
     }
 
     @Override
     public long getBytesOut()
     {
-        return _dataDelivered.get();
+        return _bytesOut.get();
     }
 
     @Override
     public long getMessagesIn()
     {
-        return _messagesReceived.get();
+        return _messagesIn.get();
     }
 
     @Override
     public long getMessagesOut()
     {
-        return _messagesDelivered.get();
+        return _messagesOut.get();
+    }
+
+    @Override
+    public long getTransactedMessagesIn()
+    {
+        return _transactedMessagesIn.get();
+    }
+
+    @Override
+    public long getTransactedMessagesOut()
+    {
+        return _transactedMessagesOut.get();
     }
 
     public AccessControlContext getAccessControllerContext()
@@ -850,6 +883,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     @Override
     public LocalTransaction createLocalTransaction()
     {
+        _localTransactionBegins.incrementAndGet();
+        _localTransactionOpens.incrementAndGet();
         return new LocalTransaction(getAddressSpace().getMessageStore(),
                                     () -> getLastReadTime(),
                                     _transactionObserver);
@@ -931,4 +966,65 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
         return getNetwork().getPeerPrincipal();
     }
 
+    @Override
+    public Date getOldestTransactionStartTime()
+    {
+        long oldest = Long.MAX_VALUE;
+        Iterator<ServerTransaction> iterator = getOpenTransactions();
+        while (iterator.hasNext())
+        {
+            final ServerTransaction value = iterator.next();
+            if (value instanceof LocalTransaction)
+            {
+                long transactionStartTimeLong = value.getTransactionStartTime();
+                if (transactionStartTimeLong > 0 && oldest > transactionStartTimeLong)
+                {
+                    oldest = transactionStartTimeLong;
+                }
+            }
+        }
+        return oldest == Long.MAX_VALUE ? null : new Date(oldest);
+    }
+
+    @Override
+    public long getLocalTransactionBegins()
+    {
+        return _localTransactionBegins.get();
+    }
+
+    @Override
+    public long getLocalTransactionOpen()
+    {
+        return _localTransactionOpens.get();
+    }
+
+    @Override
+    public long getLocalTransactionRollbacks()
+    {
+        return _localTransactionRollbacks.get();
+    }
+
+    @Override
+    public void incrementTransactionRollbackCounter()
+    {
+        _localTransactionRollbacks.incrementAndGet();
+    }
+
+    @Override
+    public void decrementTransactionOpenCounter()
+    {
+        _localTransactionOpens.decrementAndGet();
+    }
+
+    @Override
+    public void incrementTransactionOpenCounter()
+    {
+        _localTransactionOpens.incrementAndGet();
+    }
+
+    @Override
+    public void incrementTransactionBeginCounter()
+    {
+        _localTransactionBegins.incrementAndGet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7df5a58..767e0ca 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -60,6 +60,7 @@ public class LocalTransaction implements ServerTransaction
     private volatile long _txnUpdateTime = 0l;
     private ListenableFuture<Runnable> _asyncTran;
     private volatile boolean _isRollbackOnly;
+    private volatile boolean _outstandingWork;
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -103,6 +104,7 @@ public class LocalTransaction implements ServerTransaction
     public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
     {
         sync();
+        _outstandingWork = true;
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
@@ -129,6 +131,7 @@ public class LocalTransaction implements ServerTransaction
     public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
     {
         sync();
+        _outstandingWork = true;
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
@@ -192,6 +195,7 @@ public class LocalTransaction implements ServerTransaction
     public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         sync();
+        _outstandingWork = true;
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
         _transactionObserver.onMessageEnqueue(this, message);
         if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -276,6 +280,7 @@ public class LocalTransaction implements ServerTransaction
     public void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         sync();
+        _outstandingWork = true;
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
         _transactionObserver.onMessageEnqueue(this, message);
         try
@@ -525,6 +530,7 @@ public class LocalTransaction implements ServerTransaction
 
     private void resetDetails()
     {
+        _outstandingWork = false;
         _transactionObserver.onDischarge(this);
         _asyncTran = null;
         _transaction = null;
@@ -554,4 +560,10 @@ public class LocalTransaction implements ServerTransaction
     {
         return _isRollbackOnly;
     }
+
+
+    public boolean hasOutstandingWork()
+    {
+        return _outstandingWork;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 389f9a6..062b48d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -175,7 +175,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
     private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
 
-    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicLong _messagesIn = new AtomicLong();
+    private final AtomicLong _messagesOut = new AtomicLong();
+    private final AtomicLong _transactedMessagesIn = new AtomicLong();
+    private final AtomicLong _transactedMessagesOut = new AtomicLong();
+    private final AtomicLong _bytesIn = new AtomicLong();
+    private final AtomicLong _bytesOut = new AtomicLong();
 
     private volatile LinkRegistryModel _linkRegistry;
     private AtomicBoolean _blocked = new AtomicBoolean();
@@ -283,11 +288,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
         _eventLogger.message(VirtualHostMessages.CREATED(getName()));
 
-
-        _messagesDelivered = new AtomicLong();
-        _dataDelivered = new AtomicLong();
-        _messagesReceived = new AtomicLong();
-        _dataReceived = new AtomicLong();
         _principal = new VirtualHostPrincipal(this);
 
         if (systemConfig.isManagementMode())
@@ -1648,41 +1648,67 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.incrementAndGet();
-        _dataDelivered.addAndGet(messageSize);
+        _messagesOut.incrementAndGet();
+        _bytesOut.addAndGet(messageSize);
         _broker.registerMessageDelivered(messageSize);
     }
 
     @Override
     public void registerMessageReceived(long messageSize)
     {
-        _messagesReceived.incrementAndGet();
-        _dataReceived.addAndGet(messageSize);
+        _messagesIn.incrementAndGet();
+        _bytesIn.addAndGet(messageSize);
         _broker.registerMessageReceived(messageSize);
     }
 
     @Override
+    public void registerTransactedMessageReceived()
+    {
+        _transactedMessagesIn.incrementAndGet();
+        _broker.registerTransactedMessageReceived();
+    }
+
+    @Override
+    public void registerTransactedMessageDelivered()
+    {
+        _transactedMessagesOut.incrementAndGet();
+        _broker.registerTransactedMessageDelivered();
+    }
+
+    @Override
     public long getMessagesIn()
     {
-        return _messagesReceived.get();
+        return _messagesIn.get();
     }
 
     @Override
     public long getBytesIn()
     {
-        return _dataReceived.get();
+        return _bytesIn.get();
     }
 
     @Override
     public long getMessagesOut()
     {
-        return _messagesDelivered.get();
+        return _messagesOut.get();
     }
 
     @Override
     public long getBytesOut()
     {
-        return _dataDelivered.get();
+        return _bytesOut.get();
+    }
+
+    @Override
+    public long getTransactedMessagesIn()
+    {
+        return _transactedMessagesIn.get();
+    }
+
+    @Override
+    public long getTransactedMessagesOut()
+    {
+        return _transactedMessagesOut.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 42d2419..8ca90f9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -221,6 +221,18 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     long getMessagesOut();
 
     @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+            label = "Transacted Inbound",
+            description = "Total number of messages delivered by this virtualhost within a transaction.")
+    long getTransactedMessagesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+            label = "Transacted Outbound",
+            description = "Total number of messages received by this virtualhost within a transaction.")
+    long getTransactedMessagesOut();
+
+    @SuppressWarnings("unused")
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth",
             description = "Current size of all messages enqueued by this virtualhost.")
     long getTotalDepthOfQueuesBytes();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index ec9ebfa..be5c1c1 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -48,6 +48,8 @@ import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -382,4 +384,15 @@ public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnecti
                 throw new IllegalStateException("Unsupported state " + state);
         }
     }
+
+    @Override
+    public Iterator<ServerTransaction> getOpenTransactions()
+    {
+        return getSessionModels().stream()
+                                 .filter(sessionModel -> sessionModel.getServerSession()
+                                                                     .getTransaction() instanceof LocalTransaction)
+                                 .map(sessionModel -> sessionModel.getServerSession().getTransaction())
+                                 .iterator();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index aac916a..f2bb0cc 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -60,7 +60,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
@@ -152,12 +151,7 @@ public class ServerSession extends SessionInvoker
     private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
             new ConcurrentSkipListMap<>();
 
-    private ServerTransaction _transaction;
-    private final AtomicLong _txnStarts = new AtomicLong(0);
-    private final AtomicLong _txnCommits = new AtomicLong(0);
-    private final AtomicLong _txnRejects = new AtomicLong(0);
-
-    private final AtomicLong _txnCount = new AtomicLong(0);
+    private volatile ServerTransaction _transaction;
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
@@ -972,7 +966,10 @@ public class ServerSession extends SessionInvoker
                 exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
         result.send(_transaction, null);
         getAMQPConnection().registerMessageReceived(message.getSize());
-        incrementOutstandingTxnsIfNecessary();
+        if (isTransactional())
+        {
+            getAMQPConnection().registerTransactedMessageReceived();
+        }
         return result;
     }
 
@@ -980,6 +977,10 @@ public class ServerSession extends SessionInvoker
                             Runnable postIdSettingAction)
     {
         getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
+        if (_transaction.isTransactional())
+        {
+            getAMQPConnection().registerTransactedMessageDelivered();
+        }
         invoke(xfr, postIdSettingAction);
     }
 
@@ -1136,8 +1137,14 @@ public class ServerSession extends SessionInvoker
 
     public void onClose()
     {
+        AMQPConnection_0_10 amqpConnection = getAMQPConnection();
         if(_transaction instanceof LocalTransaction)
         {
+            if (((LocalTransaction) _transaction).hasOutstandingWork())
+            {
+                amqpConnection.incrementTransactionRollbackCounter();
+            }
+            amqpConnection.decrementTransactionOpenCounter();
             _transaction.rollback();
         }
         else if(_transaction instanceof DistributedTransaction)
@@ -1161,7 +1168,7 @@ public class ServerSession extends SessionInvoker
         {
             operationalLoggingMessage = ChannelMessages.CLOSE();
         }
-        getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+        amqpConnection.getEventLogger().message(getLogSubject(), operationalLoggingMessage);
     }
 
     protected void awaitClose()
@@ -1225,10 +1232,14 @@ public class ServerSession extends SessionInvoker
         return _transaction.isTransactional();
     }
 
+    ServerTransaction getTransaction()
+    {
+        return _transaction;
+    }
+
     public void selectTx()
     {
         _transaction = getConnection().getAmqpConnection().createLocalTransaction();
-        _txnStarts.incrementAndGet();
     }
 
     public void selectDtx()
@@ -1330,50 +1341,15 @@ public class ServerSession extends SessionInvoker
     public void commit()
     {
         _transaction.commit();
-
-        _txnCommits.incrementAndGet();
-        _txnStarts.incrementAndGet();
-        decrementOutstandingTxnsIfNecessary();
+        getAMQPConnection().incrementTransactionBeginCounter();
     }
 
     public void rollback()
     {
         _transaction.rollback();
-
-        _txnRejects.incrementAndGet();
-        _txnStarts.incrementAndGet();
-        decrementOutstandingTxnsIfNecessary();
-    }
-
-
-    private void incrementOutstandingTxnsIfNecessary()
-    {
-        if(isTransactional())
-        {
-            //There can currently only be at most one outstanding transaction
-            //due to only having LocalTransaction support. Set value to 1 if 0.
-            _txnCount.compareAndSet(0,1);
-        }
-    }
-
-    private void decrementOutstandingTxnsIfNecessary()
-    {
-        if(isTransactional())
-        {
-            //There can currently only be at most one outstanding transaction
-            //due to only having LocalTransaction support. Set value to 0 if 1.
-            _txnCount.compareAndSet(1,0);
-        }
-    }
-
-    public long getTxnCommits()
-    {
-        return _txnCommits.get();
-    }
-
-    public long getTxnRejects()
-    {
-        return _txnRejects.get();
+        AMQPConnection_0_10 amqpConnection = getAMQPConnection();
+        amqpConnection.incrementTransactionRollbackCounter();
+        amqpConnection.incrementTransactionBeginCounter();
     }
 
     public int getChannelId()
@@ -1381,11 +1357,6 @@ public class ServerSession extends SessionInvoker
         return getChannel();
     }
 
-    public long getTxnStart()
-    {
-        return _txnStarts.get();
-    }
-
     public Principal getAuthorizedPrincipal()
     {
         return getConnection().getAuthorizedPrincipal();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 581e4ea..62e98fa 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -107,24 +107,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
     }
 
     @Override
-    public long getTxnRejects()
-    {
-        return _serverSession.getTxnRejects();
-    }
-
-    @Override
-    public long getTxnCommits()
-    {
-        return _serverSession.getTxnCommits();
-    }
-
-    @Override
-    public long getTxnStart()
-    {
-        return _serverSession.getTxnStart();
-    }
-
-    @Override
     public int getUnacknowledgedMessageCount()
     {
         return _serverSession.getUnacknowledgedMessageCount();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c7c0f3e..29a591a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -40,7 +40,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
@@ -164,12 +163,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
-    private ServerTransaction _transaction;
-
-    private final AtomicLong _txnStarts = new AtomicLong(0);
-    private final AtomicLong _txnCommits = new AtomicLong(0);
-    private final AtomicLong _txnRejects = new AtomicLong(0);
-    private final AtomicLong _txnCount = new AtomicLong(0);
+    private volatile ServerTransaction _transaction;
 
     private final AMQPConnection_0_8 _connection;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
@@ -299,14 +293,18 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     private void setLocalTransactional()
     {
         _transaction = _connection.createLocalTransaction();
-        _txnStarts.incrementAndGet();
     }
 
-    private boolean isTransactional()
+    boolean isTransactional()
     {
         return _transaction.isTransactional();
     }
 
+    ServerTransaction getTransaction()
+    {
+        return _transaction;
+    }
+
     public void receivedComplete()
     {
         AccessController.doPrivileged(new PrivilegedAction<Void>()
@@ -321,44 +319,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    private void incrementOutstandingTxnsIfNecessary()
-    {
-        if(isTransactional())
-        {
-            //There can currently only be at most one outstanding transaction
-            //due to only having LocalTransaction support. Set value to 1 if 0.
-            _txnCount.compareAndSet(0,1);
-        }
-    }
-
-    private void decrementOutstandingTxnsIfNecessary()
-    {
-        if(isTransactional())
-        {
-            //There can currently only be at most one outstanding transaction
-            //due to only having LocalTransaction support. Set value to 0 if 1.
-            _txnCount.compareAndSet(1,0);
-        }
-    }
-
-    @Override
-    public long getTxnCommits()
-    {
-        return _txnCommits.get();
-    }
-
-    @Override
-    public long getTxnRejects()
-    {
-        return _txnRejects.get();
-    }
-
-    @Override
-    public long getTxnStart()
-    {
-        return _txnStarts.get();
-    }
-
     private void setPublishFrame(MessagePublishInfo info, final MessageDestination e)
     {
         _currentMessage = new IncomingMessage(info);
@@ -527,13 +487,16 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                                                                        .createBasicAckBody(_confirmedMessageCounter, false);
                                 _connection.writeFrame(responseBody.generateFrame(_channelId));
                             }
-                            incrementOutstandingTxnsIfNecessary();
                         }
                     }
                 }
                 finally
                 {
                     _connection.registerMessageReceived(bodySize);
+                    if (isTransactional())
+                    {
+                        _connection.registerTransactedMessageReceived();
+                    }
                     _currentMessage = null;
                 }
             }
@@ -794,6 +757,15 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 task.performAction(this);
             }
 
+            if (_transaction instanceof LocalTransaction)
+            {
+                if (((LocalTransaction) _transaction).hasOutstandingWork())
+                {
+                    _connection.incrementTransactionRollbackCounter();
+                }
+                _connection.decrementTransactionOpenCounter();
+            }
+
             _transaction.rollback();
 
             requeue();
@@ -1111,9 +1083,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     }
                     finally
                     {
-                        _txnCommits.incrementAndGet();
-                        _txnStarts.incrementAndGet();
-                        decrementOutstandingTxnsIfNecessary();
+                        _connection.incrementTransactionBeginCounter();
                     }
                 }
             });
@@ -1121,10 +1091,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         else
         {
             _transaction.commit(immediateAction);
-
-            _txnCommits.incrementAndGet();
-            _txnStarts.incrementAndGet();
-            decrementOutstandingTxnsIfNecessary();
+            _connection.incrementTransactionBeginCounter();
         }
     }
 
@@ -1143,10 +1110,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         finally
         {
             _rollingBack = false;
-
-            _txnRejects.incrementAndGet();
-            _txnStarts.incrementAndGet();
-            decrementOutstandingTxnsIfNecessary();
+            _connection.incrementTransactionRollbackCounter();
+            _connection.incrementTransactionBeginCounter();
         }
 
         postRollbackTask.run();
@@ -3368,15 +3333,18 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             closeChannel(ErrorCodes.COMMAND_INVALID,
                          "Fatal error: commit called on non-transactional channel");
         }
-        commit(new Runnable()
+        else
         {
-
-            @Override
-            public void run()
+            commit(new Runnable()
             {
-                _connection.writeFrame(_txCommitOkFrame);
-            }
-        }, true);
+
+                @Override
+                public void run()
+                {
+                    _connection.writeFrame(_txCommitOkFrame);
+                }
+            }, true);
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index abe40f9..79b2c66 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -73,6 +73,8 @@ import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.transport.TransportException;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -871,6 +873,16 @@ public class AMQPConnection_0_8Impl
     }
 
     @Override
+    public Iterator<ServerTransaction> getOpenTransactions()
+    {
+        return _channelMap.values()
+                          .stream()
+                          .filter(channel -> channel.getTransaction() instanceof LocalTransaction)
+                          .map(AMQChannel::getTransaction)
+                          .iterator();
+    }
+
+    @Override
     public void receiveChannelOpen(final int channelId)
     {
         if(_logger.isDebugEnabled())
@@ -1245,6 +1257,10 @@ public class AMQPConnection_0_8Impl
                                                   deliveryTag,
                                                   target.getConsumerTag());
             registerMessageDelivered(size);
+            if (target.getChannel().isTransactional())
+            {
+                registerTransactedMessageDelivered();
+            }
             return size;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 660d995..53f47d1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -20,8 +20,6 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.Iterator;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.DerivedAttribute;
@@ -82,7 +80,6 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
 
     void close(Error error);
 
-    Iterator<IdentifiedTransaction> getOpenTransactions();
     IdentifiedTransaction createIdentifiedTransaction();
     ServerTransaction getTransaction(int txnId);
     void removeTransaction(int txnId);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index bbdc148..b2384bd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,9 +1837,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public Iterator<IdentifiedTransaction> getOpenTransactions()
+    public Iterator<ServerTransaction> getOpenTransactions()
     {
-        return new Iterator<IdentifiedTransaction>()
+        return new Iterator<ServerTransaction>()
         {
             int _index = 0;
 
@@ -1857,7 +1857,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
             }
 
             @Override
-            public IdentifiedTransaction next()
+            public ServerTransaction next()
             {
                 IdentifiedTransaction txn;
                 for( ; _index < _openTransactions.length; _index++)
@@ -1866,7 +1866,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                     {
                         txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
                         _index++;
-                        return txn;
+                        return txn.getServerTransaction();
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 5cfb7c4..0c0cb8b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -415,6 +415,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                 try
                 {
                     txn = _linkEndpoint.getTransaction(transactionId);
+                    getSession().getConnection().registerTransactedMessageDelivered();
                 }
                 catch (UnknownTransactionException e)
                 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index bdc43c0..d74828e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -149,9 +149,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     private final String _primaryDomain;
     private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
-    private volatile long _startedTransactions;
-    private volatile long _committedTransactions;
-    private volatile long _rolledBackTransactions;
 
     public Session_1_0(final AMQPConnection_1_0 connection,
                        Begin begin,
@@ -1094,24 +1091,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     @Override
-    public long getTxnStart()
-    {
-        return _startedTransactions;
-    }
-
-    @Override
-    public long getTxnCommits()
-    {
-        return _committedTransactions;
-    }
-
-    @Override
-    public long getTxnRejects()
-    {
-        return _rolledBackTransactions;
-    }
-
-    @Override
     public String toLogString()
     {
         final AMQPConnection<?> amqpConnection = getAMQPConnection();
@@ -1188,21 +1167,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
     }
 
-    void incrementStartedTransactions()
-    {
-        _startedTransactions++;
-    }
-
-    void incrementCommittedTransactions()
-    {
-        _committedTransactions++;
-    }
-
-    void incrementRolledBackTransactions()
-    {
-        _rolledBackTransactions++;
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index f8f12aa..7dcf31a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -268,6 +268,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                         sourceSupportedOutcomes.add(Accepted.ACCEPTED_SYMBOL);
                     }
 
+                    boolean transacted = transactionId != null && transaction instanceof LocalTransaction;
                     if (sourceSupportedOutcomes.contains(outcome.getSymbol()))
                     {
                         if (transactionId == null)
@@ -284,8 +285,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     }
                     else
                     {
-                        if(transactionId != null && transaction instanceof LocalTransaction
-                           && source.getDefaultOutcome() != null
+                        if(transacted && source.getDefaultOutcome() != null
                            && outcome.getSymbol() != source.getDefaultOutcome().getSymbol())
                         {
                             ((LocalTransaction) transaction).setRollbackOnly();
@@ -297,8 +297,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
                     updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
 
-                    getSession().getAMQPConnection()
-                                .registerMessageReceived(serverMessage.getSize());
+                    getSession().getAMQPConnection().registerMessageReceived(serverMessage.getSize());
+                    if (transacted)
+                    {
+                        getSession().getAMQPConnection().registerTransactedMessageReceived();
+                    }
 
                     setRollbackOnly = false;
                 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 0f98e0b..4535ea4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -93,8 +93,6 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
 
                             Declared state = new Declared();
 
-                            session.incrementStartedTransactions();
-
                             state.setTxnId(Session_1_0.integerToTransactionId(txn.getId()));
                             updateDisposition(delivery.getDeliveryTag(), state, true);
                         }
@@ -168,26 +166,27 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
 
         if(txn != null)
         {
+            AMQPConnection_1_0<?> connection = getSession().getConnection();
             if(fail)
             {
                 txn.rollback();
-                getSession().incrementRolledBackTransactions();
+                connection.incrementTransactionRollbackCounter();
             }
             else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
             {
                 txn.commit();
-                getSession().incrementCommittedTransactions();
             }
             else
             {
                 txn.rollback();
-                getSession().incrementRolledBackTransactions();
+                connection.incrementTransactionRollbackCounter();
                 error = new Error();
                 error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
                 error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
-            getSession().getConnection().removeTransaction(transactionId);
+            connection.removeTransaction(transactionId);
+            connection.decrementTransactionOpenCounter();
         }
         else
         {
@@ -205,8 +204,10 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
         for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
         {
             entry.getValue().rollback();
-            getSession().incrementRolledBackTransactions();
-            getSession().getConnection().removeTransaction(entry.getKey());
+            AMQPConnection_1_0<?> connection = getSession().getConnection();
+            connection.decrementTransactionOpenCounter();
+            connection.incrementTransactionRollbackCounter();
+            connection.removeTransaction(entry.getKey());
         }
         close();
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/css/common.css
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/css/common.css b/broker-plugins/management-http/src/main/java/resources/css/common.css
index ea92efc..bc92699 100644
--- a/broker-plugins/management-http/src/main/java/resources/css/common.css
+++ b/broker-plugins/management-http/src/main/java/resources/css/common.css
@@ -567,11 +567,9 @@ td.advancedSearchField, col.autoWidth {
 .connectionConsumers .field-msgOutRate { width: 10% }
 .connectionConsumers .field-bytesOutRate { width: 10% }
 
-.connectionSessions .field-name { width: 30%; }
-.connectionSessions .field-consumerCount { width: 10% }
-.connectionSessions .field-unacknowledgedMessages { width: 20% }
-.connectionSessions .field-transactionStartTime { width: 20% }
-.connectionSessions .field-transactionUpdateTime { width: 20% }
+.connectionSessions .field-name { width: 50%; }
+.connectionSessions .field-consumerCount { width: 25% }
+.connectionSessions .field-unacknowledgedMessages { width: 25% }
 
 .radioButtonIndent {
     padding-left: 20px;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
index 194a199..34db715 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
@@ -371,10 +371,10 @@ define(["dojox/lang/functional/object",
                     }
                     else if (units === "ABSOLUTE_TIME")
                     {
-                        return this._userPreferences.formatDateTime(value, {
+                        return value > 0 ? this._userPreferences.formatDateTime(value, {
                             addOffset: true,
                             appendTimeZone: true
-                        });
+                        }) : "-";
                     }
                     else if (units === "TIME_DURATION")
                     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
index 6aa7f3d..85a854a 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
@@ -139,22 +139,6 @@ define(["dojo/parser",
 
             this.connectionData = {};
 
-            var transactionTimeFormatter = function (value, object)
-            {
-                if (value > 0)
-                {
-                    return userPreferences.formatDateTime(value, {
-                        selector: "time",
-                        addOffset: true,
-                        appendTimeZone: true
-                    });
-                }
-                else
-                {
-                    return "N/A";
-                }
-            };
-
             this.sessionsGrid = new QueryGrid({
                 detectChanges: true,
                 rowsPerPage: 10,
@@ -175,14 +159,6 @@ define(["dojo/parser",
                     }, {
                         label: "Unacknowledged messages",
                         field: "unacknowledgedMessages"
-                    }, {
-                        label: "Current store transaction start",
-                        field: "transactionStartTime",
-                        formatter: transactionTimeFormatter
-                    }, {
-                        label: "Current store transaction update",
-                        field: "transactionUpdateTime",
-                        formatter:transactionTimeFormatter
                     }
                 ]
             }, findNode("sessions"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org