You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/04 12:18:30 UTC

svn commit: r918941 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/qmf/ main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/configuration/ main/java/org/apache/qpid/server/message/ main/java/org/apache/qpid/server/queue...

Author: robbie
Date: Thu Mar  4 11:18:30 2010
New Revision: 918941

URL: http://svn.apache.org/viewvc?rev=918941&view=rev
Log:
QPID-2379: add BytesTxnEnqueues and MsgTxnEnqueues on Queue delegate

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Thu Mar  4 11:18:30 2010
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.message.*;
 import org.apache.qpid.transport.codec.BBEncoder;
 
@@ -202,4 +203,9 @@
         }
     }
 
+    public SessionConfig getSessionConfig()
+    {
+        return null;
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Thu Mar  4 11:18:30 2010
@@ -912,8 +912,7 @@
 
         public Long getMsgTxnEnqueues()
         {
-            // TODO
-            return 0l;
+            return _obj.getMsgTxnEnqueues();
         }
 
         public Long getMsgTxnDequeues()
@@ -954,8 +953,7 @@
 
         public Long getByteTxnEnqueues()
         {
-            // TODO
-            return 0l;
+            return _obj.getByteTxnEnqueues();
         }
 
         public Long getByteTxnDequeues()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar  4 11:18:30 2010
@@ -312,13 +312,13 @@
                         }
                         else
                         {
-                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional()));
                         }
 
                     }
                     else
                     {
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
+                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
                         incrementOutstandingTxnsIfNecessary();
                     }
                 }
@@ -1030,7 +1030,7 @@
     }
 
 
-    private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional)
             throws AMQException
     {
 
@@ -1054,12 +1054,15 @@
 
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
+        private boolean _transactional;
         private IncomingMessage _incommingMessage;
         private ArrayList<? extends BaseQueue> _destinationQueues;
 
         public MessageDeliveryAction(IncomingMessage currentMessage,
-                                     ArrayList<? extends BaseQueue> destinationQueues)
+                                     ArrayList<? extends BaseQueue> destinationQueues,
+                                     boolean transactional)
         {
+            _transactional = transactional;
             _incommingMessage = currentMessage;
             _destinationQueues = destinationQueues;
         }
@@ -1070,7 +1073,7 @@
             {
                 final boolean immediate = _incommingMessage.isImmediate();
 
-                final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
+                final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional);
                 MessageReference ref = amqMessage.newReference();
 
                 for(final BaseQueue queue : _destinationQueues)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Thu Mar  4 11:18:30 2010
@@ -49,6 +49,8 @@
     int getConsumerCountHigh();
 
     int getBindingCount();
+    
+    int getBindingCountHigh();
 
     ConfigStore getConfigStore();
 
@@ -57,8 +59,10 @@
     long getTotalEnqueueSize();
 
     long getTotalDequeueSize();
+    
+    long getByteTxnEnqueues();
 
-    int getBindingCountHigh();
+    long getMsgTxnEnqueues();
 
     long getPersistentByteEnqueues();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java Thu Mar  4 11:18:30 2010
@@ -46,4 +46,6 @@
     Long getTxnRejects();
 
     Long getTxnCount();
+    
+    boolean isTransactional();
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Thu Mar  4 11:18:30 2010
@@ -24,11 +24,14 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.queue.AMQQueue;
 
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 
 /**
@@ -63,9 +66,16 @@
 
     private final StoredMessage<MessageMetaData> _handle;
 
+    WeakReference<AMQChannel> _channelRef;
+
 
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
+        this(handle, null);
+    }
+    
+    public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+    {
         _handle = handle;
         final MessageMetaData metaData = handle.getMetaData();
         _size = metaData.getContentSize();
@@ -75,6 +85,8 @@
         {
             _flags |= IMMEDIATE;
         }
+        
+        _channelRef = channelRef;
     }
 
 
@@ -326,4 +338,9 @@
     {
         return _handle;
     }
+
+    public SessionConfig getSessionConfig()
+    {
+        return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
+   }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Thu Mar  4 11:18:30 2010
@@ -21,9 +21,10 @@
 package org.apache.qpid.server.message;
 
 import org.apache.qpid.transport.*;
+import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.transport.ServerSession;
 
-import java.util.concurrent.atomic.AtomicLong;
 import java.nio.ByteBuffer;
 import java.lang.ref.WeakReference;
 
@@ -37,13 +38,11 @@
 
     private WeakReference<Session> _sessionRef;
 
-
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
     {
 
         _storeMessage = storeMessage;
         _sessionRef = sessionRef;
-
     }
 
     private MessageMetaData_0_10 getMetaData()
@@ -142,5 +141,9 @@
         return _sessionRef == null ? null : _sessionRef.get();
     }
 
+    public SessionConfig getSessionConfig()
+    {
+        return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
+    }
     
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Thu Mar  4 11:18:30 2010
@@ -22,6 +22,8 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.server.configuration.SessionConfig;
+
 public interface ServerMessage extends EnqueableMessage, MessageContentSource
 {
     String getRoutingKey();
@@ -44,4 +46,5 @@
 
     public int getContent(ByteBuffer buf, int offset);
 
+    SessionConfig getSessionConfig();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Mar  4 11:18:30 2010
@@ -12,6 +12,7 @@
 import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.QueueConfigType;
 import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -122,6 +123,8 @@
     private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
     private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
     private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
+    private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
+    private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
 
     private final AtomicInteger _bindingCountHigh = new AtomicInteger();
 
@@ -516,7 +519,7 @@
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
-
+        incrementTxnEnqueueStats(message);
         incrementQueueCount();
         incrementQueueSize(message);
         _totalMessagesReceived.incrementAndGet();
@@ -666,6 +669,17 @@
     {
         getAtomicQueueCount().incrementAndGet();
     }
+    
+    private void incrementTxnEnqueueStats(final ServerMessage message)
+    {
+        SessionConfig session = message.getSessionConfig();
+        
+        if(session !=null && session.isTransactional())
+        {
+            _msgTxnEnqueues.incrementAndGet();
+            _byteTxnEnqueues.addAndGet(message.getSize());
+        }
+    }
 
     private void deliverMessage(final Subscription sub, final QueueEntry entry)
             throws AMQException
@@ -2064,6 +2078,16 @@
     {
         return _dequeueSize.get();
     }
+    
+    public long getByteTxnEnqueues()
+    {
+        return _byteTxnEnqueues.get();
+    }
+    
+    public long getMsgTxnEnqueues()
+    {
+        return _msgTxnEnqueues.get();
+    }
 
     public long getPersistentByteEnqueues()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Mar  4 11:18:30 2010
@@ -388,6 +388,14 @@
             sub.releaseSendLock();
         }
     }
+    
+    public boolean isTransactional()
+    {
+        // this does not look great but there should only be one "non-transactional"
+        // transactional context, while there could be several transactional ones in
+        // theory
+        return !(_transaction instanceof AutoCommitTransaction);
+    }
 
     public void selectTx()
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=918941&r1=918940&r2=918941&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Mar  4 11:18:30 2010
@@ -552,4 +552,14 @@
     {
         return 0;
     }
+
+    public long getByteTxnEnqueues()
+    {
+        return 0;
+    }
+
+    public long getMsgTxnEnqueues()
+    {
+        return 0;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org