You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/26 16:10:11 UTC
[2/2] qpid-broker-j git commit: QPID-7646: [Broker-J] Improve
transaction related statistics
QPID-7646: [Broker-J] Improve transaction related statistics
* Move transaction begin/rollback/open statistics from session into connection
* Add connection statistics oldestTransactionStartTime
* Remove transaction related statistics from session
* Add statistics for transacted message count
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/4e8ecd7b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/4e8ecd7b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/4e8ecd7b
Branch: refs/heads/master
Commit: 4e8ecd7b1fe4e70c165f79dc72d15df1bf8fd0c8
Parents: cb6e719
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Oct 25 15:14:49 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 26 17:05:01 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 13 +
.../apache/qpid/server/model/BrokerImpl.java | 53 +++-
.../apache/qpid/server/model/Connection.java | 31 ++
.../org/apache/qpid/server/model/Session.java | 27 --
.../apache/qpid/server/session/AMQPSession.java | 6 -
.../server/session/AbstractAMQPSession.java | 32 --
.../qpid/server/stats/StatisticsGatherer.java | 8 +-
.../qpid/server/transport/AMQPConnection.java | 16 +
.../transport/AbstractAMQPConnection.java | 124 +++++++-
.../qpid/server/txn/LocalTransaction.java | 12 +
.../server/virtualhost/AbstractVirtualHost.java | 54 +++-
.../virtualhost/QueueManagingVirtualHost.java | 12 +
.../protocol/v0_10/AMQPConnection_0_10Impl.java | 13 +
.../server/protocol/v0_10/ServerSession.java | 79 ++---
.../server/protocol/v0_10/Session_0_10.java | 18 --
.../qpid/server/protocol/v0_8/AMQChannel.java | 100 ++----
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 16 +
.../protocol/v1_0/AMQPConnection_1_0.java | 3 -
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 8 +-
.../protocol/v1_0/ConsumerTarget_1_0.java | 1 +
.../qpid/server/protocol/v1_0/Session_1_0.java | 36 ---
.../v1_0/StandardReceivingLinkEndpoint.java | 11 +-
.../TxnCoordinatorReceivingLinkEndpoint.java | 17 +-
.../src/main/java/resources/css/common.css | 8 +-
.../js/qpid/common/StatisticsWidget.js | 4 +-
.../resources/js/qpid/management/Connection.js | 24 --
.../qpid/systest/rest/ConnectionRestTest.java | 308 +++++++++----------
.../qpid/systest/rest/SessionRestTest.java | 160 ++++++++++
test-profiles/Java10Excludes | 2 +-
29 files changed, 696 insertions(+), 500 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index e462b83..a10fa55 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -202,6 +202,19 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
description = "Total number of messages delivered by the Broker.")
long getMessagesOut();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+ label = "Transacted Inbound",
+ description = "Total number of messages delivered by the Broker within a transaction.")
+ long getTransactedMessagesIn();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+ label = "Transacted Outbound",
+ description = "Total number of messages received by the Broker within a transaction.")
+ long getTransactedMessagesOut();
+
@ManagedOperation(nonModifying = true,
description = "Initiates an orderly shutdown of the Broker.",
changesConfiguredObjectState = false)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 7c484d3..69212a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -114,7 +114,12 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private AuthenticationProvider<?> _managementModeAuthenticationProvider;
- private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicLong _messagesIn = new AtomicLong();
+ private final AtomicLong _messagesOut = new AtomicLong();
+ private final AtomicLong _transactedMessagesIn = new AtomicLong();
+ private final AtomicLong _transactedMessagesOut = new AtomicLong();
+ private final AtomicLong _bytesIn = new AtomicLong();
+ private final AtomicLong _bytesOut = new AtomicLong();
@ManagedAttributeField
private int _statisticsReportingPeriod;
@@ -165,12 +170,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
_virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
- _messagesDelivered = new AtomicLong();
- _dataDelivered = new AtomicLong();
- _messagesReceived = new AtomicLong();
- _dataReceived = new AtomicLong();
-
-
}
private void registerSystemAddressSpaces()
@@ -809,15 +808,27 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.incrementAndGet();
- _dataDelivered.addAndGet(messageSize);
+ _messagesOut.incrementAndGet();
+ _bytesOut.addAndGet(messageSize);
+ }
+
+ @Override
+ public void registerTransactedMessageReceived()
+ {
+ _transactedMessagesIn.incrementAndGet();
+ }
+
+ @Override
+ public void registerTransactedMessageDelivered()
+ {
+ _transactedMessagesOut.incrementAndGet();
}
@Override
public void registerMessageReceived(long messageSize)
{
- _messagesReceived.incrementAndGet();
- _dataReceived.addAndGet(messageSize);
+ _messagesIn.incrementAndGet();
+ _bytesIn.addAndGet(messageSize);
}
@Override
@@ -841,25 +852,37 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
@Override
public long getMessagesIn()
{
- return _messagesReceived.get();
+ return _messagesIn.get();
}
@Override
public long getBytesIn()
{
- return _dataReceived.get();
+ return _bytesIn.get();
}
@Override
public long getMessagesOut()
{
- return _messagesDelivered.get();
+ return _messagesOut.get();
}
@Override
public long getBytesOut()
{
- return _dataDelivered.get();
+ return _bytesOut.get();
+ }
+
+ @Override
+ public long getTransactedMessagesIn()
+ {
+ return _transactedMessagesIn.get();
+ }
+
+ @Override
+ public long getTransactedMessagesOut()
+ {
+ return _transactedMessagesOut.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 95db723..14ee17a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -147,6 +147,37 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
description = "Current number of sessions belonging to this connection.")
int getSessionCount();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
+ label = "Transactions", description = "Total number of transactions started.")
+ long getLocalTransactionBegins();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT,
+ label = "Rolled-back Transactions", description = "Total number of rolled-back transactions.")
+ long getLocalTransactionRollbacks();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT,
+ label = "Open Transactions", description = "Current number of open transactions.")
+ long getLocalTransactionOpen();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME,
+ label= "Oldest transaction start time",
+ description = "The start time of the oldest transaction or null if no transaction is in progress.")
+ Date getOldestTransactionStartTime();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Inbound",
+ description = "Total number of messages delivered by this connection within a transaction.")
+ long getTransactedMessagesIn();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Outbound",
+ description = "Total number of messages received by this connection within a transaction.")
+ long getTransactedMessagesOut();
+
//children
Collection<Session> getSessions();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 91f7cdb..ccb396a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.model;
-import java.util.Date;
-
@ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
public interface Session<X extends Session<X>> extends ConfiguredObject<X>
{
@@ -58,31 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
long getConsumerCount();
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Transactions")
- long getLocalTransactionBegins();
-
- @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Open Transactions")
- int getLocalTransactionOpen();
-
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Rolled-back Transactions")
- long getLocalTransactionRollbacks();
-
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
long getUnacknowledgedMessages();
-
- /**
- * Return the time the current transaction started.
- *
- * @return the time this transaction started or 0 if not in a transaction
- */
- @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Transaction Start")
- Date getTransactionStartTime();
-
- /**
- * Return the time of the last activity on the current transaction.
- *
- * @return the time of the last activity or 0 if not in a transaction
- */
- @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Transaction Update")
- Date getTransactionUpdateTime();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 4c24e71..1b224f1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -59,12 +59,6 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
int getUnacknowledgedMessageCount();
- long getTxnStart();
-
- long getTxnCommits();
-
- long getTxnRejects();
-
@Override
long getConsumerCount();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 075e213..b68ed7f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.session;
import java.security.AccessControlContext;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -170,25 +169,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
}
@Override
- public int getLocalTransactionOpen()
- {
- long open = getTxnStart() - (getTxnCommits() + getTxnRejects());
- return (open > 0L) ? 1 : 0;
- }
-
- @Override
- public long getLocalTransactionBegins()
- {
- return getTxnStart();
- }
-
- @Override
- public long getLocalTransactionRollbacks()
- {
- return getTxnRejects();
- }
-
- @Override
public long getUnacknowledgedMessages()
{
return getUnacknowledgedMessageCount();
@@ -206,18 +186,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
_taskList.remove(task);
}
- @Override
- public Date getTransactionStartTime()
- {
- return new Date(getTransactionStartTimeLong());
- }
-
- @Override
- public Date getTransactionUpdateTime()
- {
- return new Date(getTransactionUpdateTimeLong());
- }
-
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
private ListenableFuture<Void> doDelete()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
index ecc6dbd..72f6513 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -42,7 +42,9 @@ public interface StatisticsGatherer
* @param messageSize the size in bytes of the delivered message
*/
void registerMessageReceived(long messageSize);
-
+
+ void registerTransactedMessageReceived();
+
/**
* This method is responsible for registering the delivery of a message
* with the counters.
@@ -50,7 +52,9 @@ public interface StatisticsGatherer
* @param messageSize the size in bytes of the delivered message
*/
void registerMessageDelivered(long messageSize);
-
+
+ void registerTransactedMessageDelivered();
+
/**
* Returns a number of delivered messages
*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 2ecef32..6da15b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -24,6 +24,7 @@ import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.Principal;
import java.util.Collection;
+import java.util.Iterator;
import javax.security.auth.Subject;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Deletable;
public interface AMQPConnection<C extends AMQPConnection<C>>
@@ -70,6 +72,10 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
// See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
void registerMessageDelivered(long size);
+ void registerTransactedMessageReceived();
+
+ void registerTransactedMessageDelivered();
+
void closeSessionAsync(AMQPSession<?,?> session, CloseReason reason, String message);
SocketAddress getRemoteSocketAddress();
@@ -88,6 +94,16 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
LocalTransaction createLocalTransaction();
+ void incrementTransactionRollbackCounter();
+
+ void decrementTransactionOpenCounter();
+
+ void incrementTransactionOpenCounter();
+
+ void incrementTransactionBeginCounter();
+
+ Iterator<ServerTransaction> getOpenTransactions();
+
enum CloseReason
{
MANAGEMENT,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 065cc5b..0b16b07 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -30,6 +30,7 @@ import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -73,6 +74,7 @@ import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.FixedKeyMapCreator;
@@ -105,7 +107,17 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
private String _clientId;
private volatile boolean _stopped;
- private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ private final AtomicLong _messagesIn = new AtomicLong();
+ private final AtomicLong _messagesOut = new AtomicLong();
+ private final AtomicLong _transactedMessagesIn = new AtomicLong();
+ private final AtomicLong _transactedMessagesOut = new AtomicLong();
+ private final AtomicLong _bytesIn = new AtomicLong();
+ private final AtomicLong _bytesOut = new AtomicLong();
+ private final AtomicLong _localTransactionBegins = new AtomicLong();
+ private final AtomicLong _localTransactionRollbacks = new AtomicLong();
+ private final AtomicLong _localTransactionOpens = new AtomicLong();
+
private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
private final AtomicBoolean _modelClosing = new AtomicBoolean();
@@ -147,11 +159,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
updateAccessControllerContext();
- _messagesDelivered = new AtomicLong();
- _dataDelivered = new AtomicLong();
- _messagesReceived = new AtomicLong();
- _dataReceived = new AtomicLong();
-
_transportClosedFuture.addListener(
new Runnable()
{
@@ -431,19 +438,33 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.incrementAndGet();
- _dataDelivered.addAndGet(messageSize);
+ _messagesOut.incrementAndGet();
+ _bytesOut.addAndGet(messageSize);
_statisticsGatherer.registerMessageDelivered(messageSize);
}
@Override
public void registerMessageReceived(long messageSize)
{
- _messagesReceived.incrementAndGet();
- _dataReceived.addAndGet(messageSize);
+ _messagesIn.incrementAndGet();
+ _bytesIn.addAndGet(messageSize);
_statisticsGatherer.registerMessageReceived(messageSize);
}
+ @Override
+ public void registerTransactedMessageDelivered()
+ {
+ _transactedMessagesOut.incrementAndGet();
+ _statisticsGatherer.registerTransactedMessageDelivered();
+ }
+
+ @Override
+ public void registerTransactedMessageReceived()
+ {
+ _transactedMessagesIn.incrementAndGet();
+ _statisticsGatherer.registerTransactedMessageReceived();
+ }
+
public void setClientProduct(final String clientProduct)
{
_clientProduct = clientProduct;
@@ -631,25 +652,37 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
@Override
public long getBytesIn()
{
- return _dataReceived.get();
+ return _bytesIn.get();
}
@Override
public long getBytesOut()
{
- return _dataDelivered.get();
+ return _bytesOut.get();
}
@Override
public long getMessagesIn()
{
- return _messagesReceived.get();
+ return _messagesIn.get();
}
@Override
public long getMessagesOut()
{
- return _messagesDelivered.get();
+ return _messagesOut.get();
+ }
+
+ @Override
+ public long getTransactedMessagesIn()
+ {
+ return _transactedMessagesIn.get();
+ }
+
+ @Override
+ public long getTransactedMessagesOut()
+ {
+ return _transactedMessagesOut.get();
}
public AccessControlContext getAccessControllerContext()
@@ -850,6 +883,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
@Override
public LocalTransaction createLocalTransaction()
{
+ _localTransactionBegins.incrementAndGet();
+ _localTransactionOpens.incrementAndGet();
return new LocalTransaction(getAddressSpace().getMessageStore(),
() -> getLastReadTime(),
_transactionObserver);
@@ -931,4 +966,65 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
return getNetwork().getPeerPrincipal();
}
+ @Override
+ public Date getOldestTransactionStartTime()
+ {
+ long oldest = Long.MAX_VALUE;
+ Iterator<ServerTransaction> iterator = getOpenTransactions();
+ while (iterator.hasNext())
+ {
+ final ServerTransaction value = iterator.next();
+ if (value instanceof LocalTransaction)
+ {
+ long transactionStartTimeLong = value.getTransactionStartTime();
+ if (transactionStartTimeLong > 0 && oldest > transactionStartTimeLong)
+ {
+ oldest = transactionStartTimeLong;
+ }
+ }
+ }
+ return oldest == Long.MAX_VALUE ? null : new Date(oldest);
+ }
+
+ @Override
+ public long getLocalTransactionBegins()
+ {
+ return _localTransactionBegins.get();
+ }
+
+ @Override
+ public long getLocalTransactionOpen()
+ {
+ return _localTransactionOpens.get();
+ }
+
+ @Override
+ public long getLocalTransactionRollbacks()
+ {
+ return _localTransactionRollbacks.get();
+ }
+
+ @Override
+ public void incrementTransactionRollbackCounter()
+ {
+ _localTransactionRollbacks.incrementAndGet();
+ }
+
+ @Override
+ public void decrementTransactionOpenCounter()
+ {
+ _localTransactionOpens.decrementAndGet();
+ }
+
+ @Override
+ public void incrementTransactionOpenCounter()
+ {
+ _localTransactionOpens.incrementAndGet();
+ }
+
+ @Override
+ public void incrementTransactionBeginCounter()
+ {
+ _localTransactionBegins.incrementAndGet();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7df5a58..767e0ca 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -60,6 +60,7 @@ public class LocalTransaction implements ServerTransaction
private volatile long _txnUpdateTime = 0l;
private ListenableFuture<Runnable> _asyncTran;
private volatile boolean _isRollbackOnly;
+ private volatile boolean _outstandingWork;
public LocalTransaction(MessageStore transactionLog)
{
@@ -103,6 +104,7 @@ public class LocalTransaction implements ServerTransaction
public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
{
sync();
+ _outstandingWork = true;
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
@@ -129,6 +131,7 @@ public class LocalTransaction implements ServerTransaction
public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
sync();
+ _outstandingWork = true;
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
@@ -192,6 +195,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
sync();
+ _outstandingWork = true;
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
_transactionObserver.onMessageEnqueue(this, message);
if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -276,6 +280,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
sync();
+ _outstandingWork = true;
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
_transactionObserver.onMessageEnqueue(this, message);
try
@@ -525,6 +530,7 @@ public class LocalTransaction implements ServerTransaction
private void resetDetails()
{
+ _outstandingWork = false;
_transactionObserver.onDischarge(this);
_asyncTran = null;
_transaction = null;
@@ -554,4 +560,10 @@ public class LocalTransaction implements ServerTransaction
{
return _isRollbackOnly;
}
+
+
+ public boolean hasOutstandingWork()
+ {
+ return _outstandingWork;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 389f9a6..062b48d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -175,7 +175,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
- private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicLong _messagesIn = new AtomicLong();
+ private final AtomicLong _messagesOut = new AtomicLong();
+ private final AtomicLong _transactedMessagesIn = new AtomicLong();
+ private final AtomicLong _transactedMessagesOut = new AtomicLong();
+ private final AtomicLong _bytesIn = new AtomicLong();
+ private final AtomicLong _bytesOut = new AtomicLong();
private volatile LinkRegistryModel _linkRegistry;
private AtomicBoolean _blocked = new AtomicBoolean();
@@ -283,11 +288,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_eventLogger.message(VirtualHostMessages.CREATED(getName()));
-
- _messagesDelivered = new AtomicLong();
- _dataDelivered = new AtomicLong();
- _messagesReceived = new AtomicLong();
- _dataReceived = new AtomicLong();
_principal = new VirtualHostPrincipal(this);
if (systemConfig.isManagementMode())
@@ -1648,41 +1648,67 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.incrementAndGet();
- _dataDelivered.addAndGet(messageSize);
+ _messagesOut.incrementAndGet();
+ _bytesOut.addAndGet(messageSize);
_broker.registerMessageDelivered(messageSize);
}
@Override
public void registerMessageReceived(long messageSize)
{
- _messagesReceived.incrementAndGet();
- _dataReceived.addAndGet(messageSize);
+ _messagesIn.incrementAndGet();
+ _bytesIn.addAndGet(messageSize);
_broker.registerMessageReceived(messageSize);
}
@Override
+ public void registerTransactedMessageReceived()
+ {
+ _transactedMessagesIn.incrementAndGet();
+ _broker.registerTransactedMessageReceived();
+ }
+
+ @Override
+ public void registerTransactedMessageDelivered()
+ {
+ _transactedMessagesOut.incrementAndGet();
+ _broker.registerTransactedMessageDelivered();
+ }
+
+ @Override
public long getMessagesIn()
{
- return _messagesReceived.get();
+ return _messagesIn.get();
}
@Override
public long getBytesIn()
{
- return _dataReceived.get();
+ return _bytesIn.get();
}
@Override
public long getMessagesOut()
{
- return _messagesDelivered.get();
+ return _messagesOut.get();
}
@Override
public long getBytesOut()
{
- return _dataDelivered.get();
+ return _bytesOut.get();
+ }
+
+ @Override
+ public long getTransactedMessagesIn()
+ {
+ return _transactedMessagesIn.get();
+ }
+
+ @Override
+ public long getTransactedMessagesOut()
+ {
+ return _transactedMessagesOut.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 42d2419..8ca90f9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -221,6 +221,18 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
long getMessagesOut();
@SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+ label = "Transacted Inbound",
+ description = "Total number of messages delivered by this virtualhost within a transaction.")
+ long getTransactedMessagesIn();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
+ label = "Transacted Outbound",
+ description = "Total number of messages received by this virtualhost within a transaction.")
+ long getTransactedMessagesOut();
+
+ @SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth",
description = "Current size of all messages enqueued by this virtualhost.")
long getTotalDepthOfQueuesBytes();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index ec9ebfa..be5c1c1 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -48,6 +48,8 @@ import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -382,4 +384,15 @@ public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnecti
throw new IllegalStateException("Unsupported state " + state);
}
}
+
+ @Override
+ public Iterator<ServerTransaction> getOpenTransactions()
+ {
+ return getSessionModels().stream()
+ .filter(sessionModel -> sessionModel.getServerSession()
+ .getTransaction() instanceof LocalTransaction)
+ .map(sessionModel -> sessionModel.getServerSession().getTransaction())
+ .iterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index aac916a..f2bb0cc 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -60,7 +60,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -152,12 +151,7 @@ public class ServerSession extends SessionInvoker
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<>();
- private ServerTransaction _transaction;
- private final AtomicLong _txnStarts = new AtomicLong(0);
- private final AtomicLong _txnCommits = new AtomicLong(0);
- private final AtomicLong _txnRejects = new AtomicLong(0);
-
- private final AtomicLong _txnCount = new AtomicLong(0);
+ private volatile ServerTransaction _transaction;
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
@@ -972,7 +966,10 @@ public class ServerSession extends SessionInvoker
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
result.send(_transaction, null);
getAMQPConnection().registerMessageReceived(message.getSize());
- incrementOutstandingTxnsIfNecessary();
+ if (isTransactional())
+ {
+ getAMQPConnection().registerTransactedMessageReceived();
+ }
return result;
}
@@ -980,6 +977,10 @@ public class ServerSession extends SessionInvoker
Runnable postIdSettingAction)
{
getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
+ if (_transaction.isTransactional())
+ {
+ getAMQPConnection().registerTransactedMessageDelivered();
+ }
invoke(xfr, postIdSettingAction);
}
@@ -1136,8 +1137,14 @@ public class ServerSession extends SessionInvoker
public void onClose()
{
+ AMQPConnection_0_10 amqpConnection = getAMQPConnection();
if(_transaction instanceof LocalTransaction)
{
+ if (((LocalTransaction) _transaction).hasOutstandingWork())
+ {
+ amqpConnection.incrementTransactionRollbackCounter();
+ }
+ amqpConnection.decrementTransactionOpenCounter();
_transaction.rollback();
}
else if(_transaction instanceof DistributedTransaction)
@@ -1161,7 +1168,7 @@ public class ServerSession extends SessionInvoker
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
+ amqpConnection.getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
protected void awaitClose()
@@ -1225,10 +1232,14 @@ public class ServerSession extends SessionInvoker
return _transaction.isTransactional();
}
+ ServerTransaction getTransaction()
+ {
+ return _transaction;
+ }
+
public void selectTx()
{
_transaction = getConnection().getAmqpConnection().createLocalTransaction();
- _txnStarts.incrementAndGet();
}
public void selectDtx()
@@ -1330,50 +1341,15 @@ public class ServerSession extends SessionInvoker
public void commit()
{
_transaction.commit();
-
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ getAMQPConnection().incrementTransactionBeginCounter();
}
public void rollback()
{
_transaction.rollback();
-
- _txnRejects.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
- }
-
-
- private void incrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 1 if 0.
- _txnCount.compareAndSet(0,1);
- }
- }
-
- private void decrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 0 if 1.
- _txnCount.compareAndSet(1,0);
- }
- }
-
- public long getTxnCommits()
- {
- return _txnCommits.get();
- }
-
- public long getTxnRejects()
- {
- return _txnRejects.get();
+ AMQPConnection_0_10 amqpConnection = getAMQPConnection();
+ amqpConnection.incrementTransactionRollbackCounter();
+ amqpConnection.incrementTransactionBeginCounter();
}
public int getChannelId()
@@ -1381,11 +1357,6 @@ public class ServerSession extends SessionInvoker
return getChannel();
}
- public long getTxnStart()
- {
- return _txnStarts.get();
- }
-
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 581e4ea..62e98fa 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -107,24 +107,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
}
@Override
- public long getTxnRejects()
- {
- return _serverSession.getTxnRejects();
- }
-
- @Override
- public long getTxnCommits()
- {
- return _serverSession.getTxnCommits();
- }
-
- @Override
- public long getTxnStart()
- {
- return _serverSession.getTxnStart();
- }
-
- @Override
public int getUnacknowledgedMessageCount()
{
return _serverSession.getUnacknowledgedMessageCount();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c7c0f3e..29a591a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -40,7 +40,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -164,12 +163,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private ServerTransaction _transaction;
-
- private final AtomicLong _txnStarts = new AtomicLong(0);
- private final AtomicLong _txnCommits = new AtomicLong(0);
- private final AtomicLong _txnRejects = new AtomicLong(0);
- private final AtomicLong _txnCount = new AtomicLong(0);
+ private volatile ServerTransaction _transaction;
private final AMQPConnection_0_8 _connection;
private final AtomicBoolean _closing = new AtomicBoolean(false);
@@ -299,14 +293,18 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private void setLocalTransactional()
{
_transaction = _connection.createLocalTransaction();
- _txnStarts.incrementAndGet();
}
- private boolean isTransactional()
+ boolean isTransactional()
{
return _transaction.isTransactional();
}
+ ServerTransaction getTransaction()
+ {
+ return _transaction;
+ }
+
public void receivedComplete()
{
AccessController.doPrivileged(new PrivilegedAction<Void>()
@@ -321,44 +319,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
- private void incrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 1 if 0.
- _txnCount.compareAndSet(0,1);
- }
- }
-
- private void decrementOutstandingTxnsIfNecessary()
- {
- if(isTransactional())
- {
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 0 if 1.
- _txnCount.compareAndSet(1,0);
- }
- }
-
- @Override
- public long getTxnCommits()
- {
- return _txnCommits.get();
- }
-
- @Override
- public long getTxnRejects()
- {
- return _txnRejects.get();
- }
-
- @Override
- public long getTxnStart()
- {
- return _txnStarts.get();
- }
-
private void setPublishFrame(MessagePublishInfo info, final MessageDestination e)
{
_currentMessage = new IncomingMessage(info);
@@ -527,13 +487,16 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
- incrementOutstandingTxnsIfNecessary();
}
}
}
finally
{
_connection.registerMessageReceived(bodySize);
+ if (isTransactional())
+ {
+ _connection.registerTransactedMessageReceived();
+ }
_currentMessage = null;
}
}
@@ -794,6 +757,15 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
task.performAction(this);
}
+ if (_transaction instanceof LocalTransaction)
+ {
+ if (((LocalTransaction) _transaction).hasOutstandingWork())
+ {
+ _connection.incrementTransactionRollbackCounter();
+ }
+ _connection.decrementTransactionOpenCounter();
+ }
+
_transaction.rollback();
requeue();
@@ -1111,9 +1083,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
finally
{
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ _connection.incrementTransactionBeginCounter();
}
}
});
@@ -1121,10 +1091,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
else
{
_transaction.commit(immediateAction);
-
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ _connection.incrementTransactionBeginCounter();
}
}
@@ -1143,10 +1110,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
finally
{
_rollingBack = false;
-
- _txnRejects.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ _connection.incrementTransactionRollbackCounter();
+ _connection.incrementTransactionBeginCounter();
}
postRollbackTask.run();
@@ -3368,15 +3333,18 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
closeChannel(ErrorCodes.COMMAND_INVALID,
"Fatal error: commit called on non-transactional channel");
}
- commit(new Runnable()
+ else
{
-
- @Override
- public void run()
+ commit(new Runnable()
{
- _connection.writeFrame(_txCommitOkFrame);
- }
- }, true);
+
+ @Override
+ public void run()
+ {
+ _connection.writeFrame(_txCommitOkFrame);
+ }
+ }, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index abe40f9..79b2c66 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -73,6 +73,8 @@ import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.TransportException;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -871,6 +873,16 @@ public class AMQPConnection_0_8Impl
}
@Override
+ public Iterator<ServerTransaction> getOpenTransactions()
+ {
+ return _channelMap.values()
+ .stream()
+ .filter(channel -> channel.getTransaction() instanceof LocalTransaction)
+ .map(AMQChannel::getTransaction)
+ .iterator();
+ }
+
+ @Override
public void receiveChannelOpen(final int channelId)
{
if(_logger.isDebugEnabled())
@@ -1245,6 +1257,10 @@ public class AMQPConnection_0_8Impl
deliveryTag,
target.getConsumerTag());
registerMessageDelivered(size);
+ if (target.getChannel().isTransactional())
+ {
+ registerTransactedMessageDelivered();
+ }
return size;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 660d995..53f47d1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -20,8 +20,6 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.util.Iterator;
-
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.DerivedAttribute;
@@ -82,7 +80,6 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
void close(Error error);
- Iterator<IdentifiedTransaction> getOpenTransactions();
IdentifiedTransaction createIdentifiedTransaction();
ServerTransaction getTransaction(int txnId);
void removeTransaction(int txnId);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index bbdc148..b2384bd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,9 +1837,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public Iterator<IdentifiedTransaction> getOpenTransactions()
+ public Iterator<ServerTransaction> getOpenTransactions()
{
- return new Iterator<IdentifiedTransaction>()
+ return new Iterator<ServerTransaction>()
{
int _index = 0;
@@ -1857,7 +1857,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public IdentifiedTransaction next()
+ public ServerTransaction next()
{
IdentifiedTransaction txn;
for( ; _index < _openTransactions.length; _index++)
@@ -1866,7 +1866,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
_index++;
- return txn;
+ return txn.getServerTransaction();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 5cfb7c4..0c0cb8b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -415,6 +415,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
try
{
txn = _linkEndpoint.getTransaction(transactionId);
+ getSession().getConnection().registerTransactedMessageDelivered();
}
catch (UnknownTransactionException e)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index bdc43c0..d74828e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -149,9 +149,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private final String _primaryDomain;
private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
- private volatile long _startedTransactions;
- private volatile long _committedTransactions;
- private volatile long _rolledBackTransactions;
public Session_1_0(final AMQPConnection_1_0 connection,
Begin begin,
@@ -1094,24 +1091,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
@Override
- public long getTxnStart()
- {
- return _startedTransactions;
- }
-
- @Override
- public long getTxnCommits()
- {
- return _committedTransactions;
- }
-
- @Override
- public long getTxnRejects()
- {
- return _rolledBackTransactions;
- }
-
- @Override
public String toLogString()
{
final AMQPConnection<?> amqpConnection = getAMQPConnection();
@@ -1188,21 +1167,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
- void incrementStartedTransactions()
- {
- _startedTransactions++;
- }
-
- void incrementCommittedTransactions()
- {
- _committedTransactions++;
- }
-
- void incrementRolledBackTransactions()
- {
- _rolledBackTransactions++;
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index f8f12aa..7dcf31a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -268,6 +268,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
sourceSupportedOutcomes.add(Accepted.ACCEPTED_SYMBOL);
}
+ boolean transacted = transactionId != null && transaction instanceof LocalTransaction;
if (sourceSupportedOutcomes.contains(outcome.getSymbol()))
{
if (transactionId == null)
@@ -284,8 +285,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
else
{
- if(transactionId != null && transaction instanceof LocalTransaction
- && source.getDefaultOutcome() != null
+ if(transacted && source.getDefaultOutcome() != null
&& outcome.getSymbol() != source.getDefaultOutcome().getSymbol())
{
((LocalTransaction) transaction).setRollbackOnly();
@@ -297,8 +297,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
- getSession().getAMQPConnection()
- .registerMessageReceived(serverMessage.getSize());
+ getSession().getAMQPConnection().registerMessageReceived(serverMessage.getSize());
+ if (transacted)
+ {
+ getSession().getAMQPConnection().registerTransactedMessageReceived();
+ }
setRollbackOnly = false;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 0f98e0b..4535ea4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -93,8 +93,6 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
Declared state = new Declared();
- session.incrementStartedTransactions();
-
state.setTxnId(Session_1_0.integerToTransactionId(txn.getId()));
updateDisposition(delivery.getDeliveryTag(), state, true);
}
@@ -168,26 +166,27 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
if(txn != null)
{
+ AMQPConnection_1_0<?> connection = getSession().getConnection();
if(fail)
{
txn.rollback();
- getSession().incrementRolledBackTransactions();
+ connection.incrementTransactionRollbackCounter();
}
else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
{
txn.commit();
- getSession().incrementCommittedTransactions();
}
else
{
txn.rollback();
- getSession().incrementRolledBackTransactions();
+ connection.incrementTransactionRollbackCounter();
error = new Error();
error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
}
_createdTransactions.remove(transactionId);
- getSession().getConnection().removeTransaction(transactionId);
+ connection.removeTransaction(transactionId);
+ connection.decrementTransactionOpenCounter();
}
else
{
@@ -205,8 +204,10 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
{
entry.getValue().rollback();
- getSession().incrementRolledBackTransactions();
- getSession().getConnection().removeTransaction(entry.getKey());
+ AMQPConnection_1_0<?> connection = getSession().getConnection();
+ connection.decrementTransactionOpenCounter();
+ connection.incrementTransactionRollbackCounter();
+ connection.removeTransaction(entry.getKey());
}
close();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/css/common.css
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/css/common.css b/broker-plugins/management-http/src/main/java/resources/css/common.css
index ea92efc..bc92699 100644
--- a/broker-plugins/management-http/src/main/java/resources/css/common.css
+++ b/broker-plugins/management-http/src/main/java/resources/css/common.css
@@ -567,11 +567,9 @@ td.advancedSearchField, col.autoWidth {
.connectionConsumers .field-msgOutRate { width: 10% }
.connectionConsumers .field-bytesOutRate { width: 10% }
-.connectionSessions .field-name { width: 30%; }
-.connectionSessions .field-consumerCount { width: 10% }
-.connectionSessions .field-unacknowledgedMessages { width: 20% }
-.connectionSessions .field-transactionStartTime { width: 20% }
-.connectionSessions .field-transactionUpdateTime { width: 20% }
+.connectionSessions .field-name { width: 50%; }
+.connectionSessions .field-consumerCount { width: 25% }
+.connectionSessions .field-unacknowledgedMessages { width: 25% }
.radioButtonIndent {
padding-left: 20px;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
index 194a199..34db715 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/StatisticsWidget.js
@@ -371,10 +371,10 @@ define(["dojox/lang/functional/object",
}
else if (units === "ABSOLUTE_TIME")
{
- return this._userPreferences.formatDateTime(value, {
+ return value > 0 ? this._userPreferences.formatDateTime(value, {
addOffset: true,
appendTimeZone: true
- });
+ }) : "-";
}
else if (units === "TIME_DURATION")
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4e8ecd7b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
index 6aa7f3d..85a854a 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
@@ -139,22 +139,6 @@ define(["dojo/parser",
this.connectionData = {};
- var transactionTimeFormatter = function (value, object)
- {
- if (value > 0)
- {
- return userPreferences.formatDateTime(value, {
- selector: "time",
- addOffset: true,
- appendTimeZone: true
- });
- }
- else
- {
- return "N/A";
- }
- };
-
this.sessionsGrid = new QueryGrid({
detectChanges: true,
rowsPerPage: 10,
@@ -175,14 +159,6 @@ define(["dojo/parser",
}, {
label: "Unacknowledged messages",
field: "unacknowledgedMessages"
- }, {
- label: "Current store transaction start",
- field: "transactionStartTime",
- formatter: transactionTimeFormatter
- }, {
- label: "Current store transaction update",
- field: "transactionUpdateTime",
- formatter:transactionTimeFormatter
}
]
}, findNode("sessions"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org