You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/05 21:08:54 UTC
svn commit: r1772807 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:
Session_1_0.java TxnCoordinatorReceivingLink_1_0.java
Author: rgodfrey
Date: Mon Dec 5 21:08:54 2016
New Revision: 1772807
URL: http://svn.apache.org/viewvc?rev=1772807&view=rev
Log:
QPID-7546 : implement transaction counting statistics
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772807&r1=1772806&r2=1772807&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Dec 5 21:08:54 2016
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogMessage;
@@ -176,6 +175,10 @@ public class Session_1_0 implements AMQS
"Force detach the link because the session is remotely ended.");
private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
+ private volatile long _startedTransactions;
+ private volatile long _committedTransactions;
+ private volatile long _rolledBackTransactions;
+ private volatile int _unacknowledgedMessages;
public Session_1_0(final AMQPConnection_1_0 connection)
@@ -1213,7 +1216,7 @@ public class Session_1_0 implements AMQS
for(int i = 0; i < txnId.getLength(); i++)
{
id <<= 8;
- id += ((int)data[i+txnId.getArrayOffset()] & 0xff);
+ id |= ((int)data[i+txnId.getArrayOffset()] & 0xff);
}
return id;
@@ -1456,29 +1459,25 @@ public class Session_1_0 implements AMQS
@Override
public int getUnacknowledgedMessageCount()
{
- // TODO
- return 0;
+ return _unacknowledgedMessages;
}
@Override
public long getTxnStart()
{
- // TODO
- return 0l;
+ return _startedTransactions;
}
@Override
public long getTxnCommits()
{
- // TODO
- return 0l;
+ return _committedTransactions;
}
@Override
public long getTxnRejects()
{
- // TODO
- return 0L;
+ return _rolledBackTransactions;
}
@Override
@@ -1668,6 +1667,31 @@ public class Session_1_0 implements AMQS
}
}
+ void incrementStartedTransactions()
+ {
+ _startedTransactions++;
+ }
+
+ void incrementCommittedTransactions()
+ {
+ _committedTransactions++;
+ }
+
+ void incrementRolledBackTransactions()
+ {
+ _rolledBackTransactions++;
+ }
+
+ void incrementUnacknowledged()
+ {
+ _unacknowledgedMessages++;
+ }
+
+ void decrementUnacknowledged()
+ {
+ _unacknowledgedMessages--;
+ }
+
private class ConsumerClosedListener extends AbstractConfigurationChangeListener
{
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1772807&r1=1772806&r2=1772807&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Mon Dec 5 21:08:54 2016
@@ -74,7 +74,6 @@ public class TxnCoordinatorReceivingLink
public void messageTransfer(Transfer xfr)
{
- // TODO - cope with fragmented messages
QpidByteBuffer payload = null;
@@ -115,7 +114,7 @@ public class TxnCoordinatorReceivingLink
xfr.dispose();
}
- // Only interested int he amqp-value section that holds the message to the coordinator
+ // Only interested int the amqp-value section that holds the message to the coordinator
try
{
List<Section> sections = _sectionDecoder.parseAll(payload);
@@ -141,7 +140,7 @@ public class TxnCoordinatorReceivingLink
Declared state = new Declared();
-
+ _session.incrementStartedTransactions();
state.setTxnId(_session.integerToBinary(txnId));
_endpoint.updateDisposition(deliveryTag, state, true);
@@ -176,7 +175,6 @@ public class TxnCoordinatorReceivingLink
public void remoteDetached(LinkEndpoint endpoint, Detach detach)
{
- //TODO
endpoint.detach();
}
@@ -195,10 +193,12 @@ public class TxnCoordinatorReceivingLink
if(fail)
{
txn.rollback();
+ _session.incrementRolledBackTransactions();
}
else
{
txn.commit();
+ _session.incrementCommittedTransactions();
}
_openTransactions.remove(transactionId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org