You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/04 12:18:30 UTC
svn commit: r918941 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/qmf/ main/java/org/apache/qpid/server/
main/java/org/apache/qpid/server/configuration/
main/java/org/apache/qpid/server/message/
main/java/org/apache/qpid/server/queue...
Author: robbie
Date: Thu Mar 4 11:18:30 2010
New Revision: 918941
URL: http://svn.apache.org/viewvc?rev=918941&view=rev
Log:
QPID-2379: add BytesTxnEnqueues and MsgTxnEnqueues on Queue delegate
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Thu Mar 4 11:18:30 2010
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.*;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -202,4 +203,9 @@
}
}
+ public SessionConfig getSessionConfig()
+ {
+ return null;
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Thu Mar 4 11:18:30 2010
@@ -912,8 +912,7 @@
public Long getMsgTxnEnqueues()
{
- // TODO
- return 0l;
+ return _obj.getMsgTxnEnqueues();
}
public Long getMsgTxnDequeues()
@@ -954,8 +953,7 @@
public Long getByteTxnEnqueues()
{
- // TODO
- return 0l;
+ return _obj.getByteTxnEnqueues();
}
public Long getByteTxnDequeues()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar 4 11:18:30 2010
@@ -312,13 +312,13 @@
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional()));
}
}
else
{
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
incrementOutstandingTxnsIfNecessary();
}
}
@@ -1030,7 +1030,7 @@
}
- private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+ private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional)
throws AMQException
{
@@ -1054,12 +1054,15 @@
private class MessageDeliveryAction implements ServerTransaction.Action
{
+ private boolean _transactional;
private IncomingMessage _incommingMessage;
private ArrayList<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<? extends BaseQueue> destinationQueues)
+ ArrayList<? extends BaseQueue> destinationQueues,
+ boolean transactional)
{
+ _transactional = transactional;
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
}
@@ -1070,7 +1073,7 @@
{
final boolean immediate = _incommingMessage.isImmediate();
- final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
+ final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional);
MessageReference ref = amqMessage.newReference();
for(final BaseQueue queue : _destinationQueues)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Thu Mar 4 11:18:30 2010
@@ -49,6 +49,8 @@
int getConsumerCountHigh();
int getBindingCount();
+
+ int getBindingCountHigh();
ConfigStore getConfigStore();
@@ -57,8 +59,10 @@
long getTotalEnqueueSize();
long getTotalDequeueSize();
+
+ long getByteTxnEnqueues();
- int getBindingCountHigh();
+ long getMsgTxnEnqueues();
long getPersistentByteEnqueues();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java Thu Mar 4 11:18:30 2010
@@ -46,4 +46,6 @@
Long getTxnRejects();
Long getTxnCount();
+
+ boolean isTransactional();
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Thu Mar 4 11:18:30 2010
@@ -24,11 +24,14 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
@@ -63,9 +66,16 @@
private final StoredMessage<MessageMetaData> _handle;
+ WeakReference<AMQChannel> _channelRef;
+
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
+ this(handle, null);
+ }
+
+ public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+ {
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -75,6 +85,8 @@
{
_flags |= IMMEDIATE;
}
+
+ _channelRef = channelRef;
}
@@ -326,4 +338,9 @@
{
return _handle;
}
+
+ public SessionConfig getSessionConfig()
+ {
+ return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Thu Mar 4 11:18:30 2010
@@ -21,9 +21,10 @@
package org.apache.qpid.server.message;
import org.apache.qpid.transport.*;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.transport.ServerSession;
-import java.util.concurrent.atomic.AtomicLong;
import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
@@ -37,13 +38,11 @@
private WeakReference<Session> _sessionRef;
-
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
_storeMessage = storeMessage;
_sessionRef = sessionRef;
-
}
private MessageMetaData_0_10 getMetaData()
@@ -142,5 +141,9 @@
return _sessionRef == null ? null : _sessionRef.get();
}
+ public SessionConfig getSessionConfig()
+ {
+ return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Thu Mar 4 11:18:30 2010
@@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
+import org.apache.qpid.server.configuration.SessionConfig;
+
public interface ServerMessage extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
@@ -44,4 +46,5 @@
public int getContent(ByteBuffer buf, int offset);
+ SessionConfig getSessionConfig();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Mar 4 11:18:30 2010
@@ -12,6 +12,7 @@
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -122,6 +123,8 @@
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
+ private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -516,7 +519,7 @@
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
-
+ incrementTxnEnqueueStats(message);
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
@@ -666,6 +669,17 @@
{
getAtomicQueueCount().incrementAndGet();
}
+
+ private void incrementTxnEnqueueStats(final ServerMessage message)
+ {
+ SessionConfig session = message.getSessionConfig();
+
+ if(session !=null && session.isTransactional())
+ {
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
+ }
+ }
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
@@ -2064,6 +2078,16 @@
{
return _dequeueSize.get();
}
+
+ public long getByteTxnEnqueues()
+ {
+ return _byteTxnEnqueues.get();
+ }
+
+ public long getMsgTxnEnqueues()
+ {
+ return _msgTxnEnqueues.get();
+ }
public long getPersistentByteEnqueues()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Mar 4 11:18:30 2010
@@ -388,6 +388,14 @@
sub.releaseSendLock();
}
}
+
+ public boolean isTransactional()
+ {
+ // this does not look great but there should only be one "non-transactional"
+ // transactional context, while there could be several transactional ones in
+ // theory
+ return !(_transaction instanceof AutoCommitTransaction);
+ }
public void selectTx()
{
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Mar 4 11:18:30 2010
@@ -552,4 +552,14 @@
{
return 0;
}
+
+ public long getByteTxnEnqueues()
+ {
+ return 0;
+ }
+
+ public long getMsgTxnEnqueues()
+ {
+ return 0;
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org