You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/15 11:43:21 UTC

svn commit: r1157750 [2/5] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ broker-plugins/experimental/shutdown/src/main/java/ broker/ broker/bin/ broker/src/main/java/org/apache/qpid/qmf/ broker/src/main/java/org/apache/qpid/server/ broker/src/m...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Aug 15 09:43:16 2011
@@ -358,15 +358,6 @@ public class QueueEntryImpl implements Q
         }
     }
 
-    public void requeue(Subscription subscription)
-    {
-        getQueue().requeue(this, subscription);
-        if(_stateChangeListeners != null)
-        {
-            notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
-        }
-    }
-
     public void dequeue()
     {
         EntryState state = _state;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Aug 15 09:43:16 2011
@@ -849,24 +849,6 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void requeue(QueueEntryImpl entry, Subscription subscription)
-    {
-        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
-        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance())
-        {
-            Subscription sub = subscriberIter.getNode().getSubscription();
-
-            // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
-            {
-                updateSubRequeueEntry(sub, entry);
-            }
-        }
-
-        deliverAsync();
-    }
-
     public void dequeue(QueueEntry entry, Subscription sub)
     {
         decrementQueueCount();
@@ -1629,7 +1611,7 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync()
     {
-        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1648,52 +1630,6 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(flusher);
     }
 
-
-    private class Runner implements ReadWriteRunnable
-    {
-        String _name;
-        public Runner(long count)
-        {
-            _name = "QueueRunner-" + count + "-" + _logActor;
-        }
-
-        public void run()
-        {
-            String originalName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName(_name);
-                CurrentActor.set(_logActor);
-
-                processQueue(this);
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
-            }
-            finally
-            {
-                CurrentActor.remove();
-                Thread.currentThread().setName(originalName);
-            }
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-
-        public String toString()
-        {
-            return _name;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         // Access control
@@ -1850,14 +1786,40 @@ public class SimpleAMQQueue implements A
     }
 
 
-    private void processQueue(Runnable runner) throws AMQException
+    /**
+     * Used by queue Runners to asynchronously deliver messages to consumers.
+     *
+     * A queue Runner is started whenever a state change occurs, e.g when a new
+     * message arrives on the queue and cannot be immediately delivered to a
+     * subscription (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * capable of accepting/delivering all messages then these messages would
+     * otherwise remain on the queue.
+     *
+     * processQueue should be running while there are messages on the queue AND
+     * there are subscriptions that can deliver them. If there are no
+     * subscriptions capable of delivering the remaining messages on the queue
+     * then processQueue should stop to prevent spinning.
+     *
+     * Since processQueue is runs in a fixed size Executor, it should not run
+     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+     * incoming messages may not be able to be scheduled in the thread pool
+     * because all threads are working on clearing down large queues). To solve
+     * this problem, after an arbitrary number of message deliveries the
+     * processQueue job stops iterating, resubmits itself to the executor, and
+     * ends the current instance
+     *
+     * @param runner the Runner to schedule
+     * @throws AMQException
+     */
+    public void processQueue(QueueRunner runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        int extraLoops = 1;
-        long iterations = MAX_ASYNC_DELIVERIES;
+        boolean lastLoop = false;
+        int iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1874,12 +1836,14 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                extraLoops = 1;
+                //further asynchronous delivery is required since the
+                //previous loop. keep going if iteration slicing allows.
+                lastLoop = false;
             }
 
             previousStateChangeCount = stateChangeCount;
-            deliveryIncomplete = _subscriptionList.size() != 0;
-            boolean done;
+            boolean allSubscriptionsDone = true;
+            boolean subscriptionDone;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1889,29 +1853,24 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-
-                    done = attemptDelivery(sub);
-
-                    if (done)
+                    //attempt delivery. returns true if no further delivery currently possible to this sub
+                    subscriptionDone = attemptDelivery(sub);
+                    if (subscriptionDone)
                     {
-                        if (extraLoops == 0)
-                        {
                             if(getNextAvailableEntry(sub) == null)
-                            {
+                        {
                                 sub.queueEmpty();
-                            }
+                        }
                             deliveryIncomplete = false;
 
-                        }
-                        else
-                        {
-                            extraLoops--;
-                        }
                     }
                     else
                     {
+                        //this subscription can accept additional deliveries, so we must 
+                        //keep going after this (if iteration slicing allows it)
+                        allSubscriptionsDone = false;
+                        lastLoop = false;
                         iterations--;
-                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1919,10 +1878,34 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
+
+            if(allSubscriptionsDone && lastLoop)
+            {
+                //We have done an extra loop already and there are again
+                //again no further delivery attempts possible, only
+                //keep going if state change demands it.
+                deliveryIncomplete = false;
+            }
+            else if(allSubscriptionsDone)
+            {
+                //All subscriptions reported being done, but we have to do
+                //an extra loop if the iterations are not exhausted and
+                //there is still any work to be done
+                deliveryIncomplete = _subscriptionList.size() != 0;
+                lastLoop = true;
+            }
+            else
+            {
+                //some subscriptions can still accept more messages,
+                //keep going if iteration count allows.
+                lastLoop = false;
+                deliveryIncomplete = true;
+            }
+
             _asynchronousRunner.set(null);
         }
 
-        // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
@@ -2283,4 +2266,9 @@ public class SimpleAMQQueue implements A
             }
         }
     }
+
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Mon Aug 15 09:43:16 2011
@@ -23,6 +23,8 @@ package org.apache.qpid.server.registry;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 
 import org.apache.commons.configuration.ConfigurationException;
@@ -41,11 +43,12 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
 import org.apache.qpid.server.logging.Log4jMessageLogger;
 import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -104,6 +108,10 @@ public abstract class ApplicationRegistr
     private ConfigStore _configStore;
 
     protected String _registryName;
+    
+    private Timer _reportingTimer;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     static
     {
@@ -294,6 +302,8 @@ public abstract class ApplicationRegistr
         try
         {
             initialiseVirtualHosts();
+            initialiseStatistics();
+            initialiseStatisticsReporting();
         }
         finally
         {
@@ -320,6 +330,72 @@ public abstract class ApplicationRegistr
     {
         _managedObjectRegistry = new NoopManagedObjectRegistry();
     }
+    
+    public void initialiseStatisticsReporting()
+    {
+        long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+        final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+        final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+        final boolean reset = _configuration.isStatisticsReportResetEnabled();
+        
+        /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+        if (report > 0L && (broker || virtualhost))
+        {
+            _reportingTimer = new Timer("Statistics-Reporting", true);
+            
+            class StatisticsReportingTask extends TimerTask
+            {
+                private final int DELIVERED = 0;
+                private final int RECEIVED = 1;
+                
+                public void run()
+                {
+                    CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+                        public String getLogMessage()
+                        {
+                            return "[" + Thread.currentThread().getName() + "] ";
+                        }
+                    });
+                    
+                    if (broker)
+                    {
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+                    }
+                    
+                    if (virtualhost)
+                    {
+                        for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+                        {
+                            String name = vhost.getName();
+                            StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+                            StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+                            StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+                            StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+                            
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+                        }
+                    }
+                    
+                    if (reset)
+                    {
+                        resetStatistics();
+                    }
+
+                    CurrentActor.remove();
+                }
+            }
+
+            _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+                                                report / 2,
+                                                report);
+        }
+    }
 
     public static IApplicationRegistry getInstance()
     {
@@ -369,6 +445,12 @@ public abstract class ApplicationRegistr
         {
             _logger.info("Shutting down ApplicationRegistry:" + this);
         }
+        
+        //Stop Statistics Reporting
+        if (_reportingTimer != null)
+        {
+            _reportingTimer.cancel();
+        }
 
         //Stop incoming connections
         unbind();
@@ -498,4 +580,76 @@ public abstract class ApplicationRegistr
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;
     }
+    
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+    }
+    
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+        
+        for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+        {
+            vhost.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                getConfiguration().isStatisticsGenerationBrokerEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered");
+        _dataDelivered = new StatisticsCounter("bytes-delivered");
+        _messagesReceived = new StatisticsCounter("messages-received");
+        _dataReceived = new StatisticsCounter("bytes-received");
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Mon Aug 15 09:43:16 2011
@@ -35,11 +35,12 @@ import org.apache.qpid.server.plugins.Pl
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
 {
     /**
      * Initialise the application registry. All initialisation must be done in this method so that any components
@@ -97,4 +98,6 @@ public interface IApplicationRegistry
     ConfigStore getConfigStore();
 
     void setConfigStore(ConfigStore store);
+    
+    void initialiseStatisticsReporting();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Mon Aug 15 09:43:16 2011
@@ -45,9 +45,10 @@ public class AmqPlainSaslServerFactory i
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Mon Aug 15 09:43:16 2011
@@ -47,10 +47,11 @@ public class AnonymousSaslServerFactory 
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE) ||
-            props.containsKey(Sasl.POLICY_NOANONYMOUS)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE) ||
+             props.containsKey(Sasl.POLICY_NOANONYMOUS)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];
@@ -60,4 +61,4 @@ public class AnonymousSaslServerFactory 
             return new String[]{AnonymousSaslServer.MECHANISM};
         }
     }
-}
\ No newline at end of file
+}

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java Mon Aug 15 09:43:16 2011
@@ -70,7 +70,7 @@ public class CRAMMD5HexInitialiser exten
             for (char c : password)
             {
                 //toHexString does not prepend 0 so we have to
-                if (((byte) c > -1) && (byte) c < 10)
+                if (((byte) c > -1) && (byte) c < 0x10 )
                 {
                     sb.append(0);
                 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java Mon Aug 15 09:43:16 2011
@@ -45,9 +45,10 @@ public class PlainSaslServerFactory impl
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-            props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null &&
+            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+             props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Aug 15 09:43:16 2011
@@ -97,7 +97,6 @@ public class Subscription_0_10 implement
 
     private FlowCreditManager_0_10 _creditManager;
 
-
     private StateListener _stateListener = new StateListener()
                                             {
 
@@ -431,7 +430,7 @@ public class Subscription_0_10 implement
             Struct[] headers = new Struct[] { deliveryProps, messageProps };
 
             BasicContentHeaderProperties properties =
-                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
             final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
             if(exchange != null)
             {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Aug 15 09:43:16 2011
@@ -24,6 +24,9 @@ import static org.apache.qpid.server.log
 
 import java.text.MessageFormat;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -35,12 +38,16 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
 import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
 
 public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
 {
@@ -49,11 +56,20 @@ public class ServerConnection extends Co
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
 
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    
     public ServerConnection()
     {
 
     }
 
+    public UUID getId()
+    {
+        return _config.getId();
+    }
+
     @Override
     protected void invoke(Method method)
     {
@@ -72,8 +88,18 @@ public class ServerConnection extends Co
                 _onOpenTask.run();    
             }
             _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
+
+            getVirtualHost().getConnectionRegistry().registerConnection(this);
         }
-        
+
+        if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
+        {
+            if(_virtualHost != null)
+            {
+                _virtualHost.getConnectionRegistry().deregisterConnection(this);
+            }
+        }
+
         if (state == State.CLOSED)
         {
             logClosed();
@@ -110,6 +136,8 @@ public class ServerConnection extends Co
     public void setVirtualHost(VirtualHost virtualHost)
     {
         _virtualHost = virtualHost;
+        
+        initialiseStatistics();
     }
 
     public void setConnectionConfig(final ConnectionConfig config)
@@ -145,6 +173,11 @@ public class ServerConnection extends Co
 
         ((ServerSession)session).close();
     }
+    
+    public LogSubject getLogSubject()
+    {
+        return (LogSubject) this;
+    }
 
     @Override
     public void received(ProtocolEvent event)
@@ -215,4 +248,99 @@ public class ServerConnection extends Co
     {
         return _actor;
     }
+
+    @Override
+    public void close(AMQConstant cause, String message) throws AMQException
+    {
+        ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+        try
+        {
+	        replyCode = ConnectionCloseCode.get(cause.getCode());
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // Ignore
+        }
+        close(replyCode, message);
+    }
+
+    @Override
+    public List<AMQSessionModel> getSessionModels()
+    {
+        List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+        for (Session ssn : getChannels())
+        {
+            sessions.add((AMQSessionModel) ssn);
+        }
+        return sessions;
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _virtualHost.registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _virtualHost.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Aug 15 09:43:16 2011
@@ -20,26 +20,28 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.common.ClientProperties;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import java.util.*;
+import org.apache.qpid.transport.*;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
     private String _localFQDN;
     private final IApplicationRegistry _appRegistry;
 
-
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
     {
         this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -138,6 +140,7 @@ public class ServerConnectionDelegate ex
             sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
             sconn.setState(Connection.State.CLOSING);
         }
+        
     }
     
     @Override

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Aug 15 09:43:16 2011
@@ -20,12 +20,26 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.util.Serial.*;
 
-import com.sun.security.auth.UserPrincipal;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sun.security.auth.UserPrincipal;
 
 public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
 {
+    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+    
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
     private final UUID _id;
@@ -111,6 +116,7 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
+    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private Principal _principal;
 
@@ -141,7 +147,7 @@ public class ServerSession extends Sessi
         _connectionConfig = connConfig;
         _transaction = new AutoCommitTransaction(this.getMessageStore());
         _principal = new UserPrincipal(connection.getAuthorizationID());
-        _reference = new WeakReference(this);
+        _reference = new WeakReference<Session>(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
@@ -160,8 +166,8 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
     {
-
-            _transaction.enqueue(queues,message, new ServerTransaction.Action()
+        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+        _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -189,6 +195,7 @@ public class ServerSession extends Sessi
             });
 
             incrementOutstandingTxnsIfNecessary();
+            updateTransactionalActivity();
     }
 
 
@@ -196,6 +203,7 @@ public class ServerSession extends Sessi
                             Runnable postIdSettingAction)
     {
         invoke(xfr, postIdSettingAction);
+        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
     }
 
     public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -377,6 +385,7 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
+	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -425,6 +434,11 @@ public class ServerSession extends Sessi
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
+    
+    public boolean inTransaction()
+    {
+        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+    }
 
     public void selectTx()
     {
@@ -471,6 +485,17 @@ public class ServerSession extends Sessi
         }
     }
 
+    /**
+     * Update last transaction activity timestamp
+     */
+    public void updateTransactionalActivity()
+    {
+        if (isTransactional())
+        {
+            _txnUpdateTime.set(System.currentTimeMillis());
+        }
+    }
+
     public Long getTxnStarts()
     {
         return _txnStarts.get();
@@ -606,6 +631,39 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+    {
+        if (inTransaction())
+        {
+            long currentTime = System.currentTimeMillis();
+            long openTime = currentTime - _transaction.getTransactionStartTime();
+            long idleTime = currentTime - _txnUpdateTime.get();
+
+            // Log a warning on idle or open transactions
+            if (idleWarn > 0L && idleTime > idleWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
+                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+            }
+            else if (openWarn > 0L && openTime > openWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+            }
+
+            // Close connection for idle or open transactions that have timed out
+            if (idleClose > 0L && idleTime > idleClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+            }
+            else if (openClose > 0L && openTime > openClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+            }
+        }
+    }
+
+    @Override
     public String toLogString()
     {
        return "[" +
@@ -616,7 +674,5 @@ public class ServerSession extends Sessi
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
-
     }
-
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Aug 15 09:43:16 2011
@@ -209,26 +209,27 @@ public class ServerSessionDelegate exten
                 }
                 else
                 {
-
                     if(queue.isExclusive())
                     {
+                        ServerSession s = (ServerSession) session;
+                        queue.setExclusiveOwningSession(s);
                         if(queue.getPrincipalHolder() == null)
                         {
-                            queue.setPrincipalHolder((ServerSession)session);
+                            queue.setPrincipalHolder(s);
+                            queue.setExclusiveOwningSession(s);
                             ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
                                     if(queue.getPrincipalHolder() == session)
                                     {
                                         queue.setPrincipalHolder(null);
+                                        queue.setExclusiveOwningSession(null);
                                     }
                                 }
                             });
                         }
 
-
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -369,7 +370,6 @@ public class ServerSessionDelegate exten
         }
 
         ssn.processed(xfr);
-
     }
 
     @Override
@@ -969,10 +969,10 @@ public class ServerSessionDelegate exten
 
                         }
 
-                        if(method.hasAutoDelete()
-                           && method.getAutoDelete()
-                           && method.hasExclusive()
-                           && method.getExclusive())
+                        if (method.hasAutoDelete()
+                            && method.getAutoDelete()
+                            && method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -999,12 +999,12 @@ public class ServerSessionDelegate exten
                                     }
                                 });
                         }
-                        else if(method.getExclusive())
+                        if (method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task removeExclusive = new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
                                     q.setPrincipalHolder(null);
@@ -1012,10 +1012,10 @@ public class ServerSessionDelegate exten
                                 }
                             };
                             final ServerSession s = (ServerSession) session;
+                            q.setExclusiveOwningSession(s);
                             s.addSessionCloseTask(removeExclusive);
                             queue.addQueueDeleteTask(new AMQQueue.Task()
                             {
-
                                 public void doTask(AMQQueue queue) throws AMQException
                                 {
                                     s.removeSessionCloseTask(removeExclusive);
@@ -1029,7 +1029,7 @@ public class ServerSessionDelegate exten
                     }
                 }
             }
-            else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
+            else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
             {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Mon Aug 15 09:43:16 2011
@@ -50,6 +50,11 @@ public class AutoCommitTransaction imple
         _transactionLog = transactionLog;
     }
 
+    public long getTransactionStartTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Mon Aug 15 09:43:16 2011
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
  * 
  */
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.Tran
  */
 public class LocalTransaction implements ServerTransaction
 {
-    protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+    protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile TransactionLog.Transaction _transaction;
     private TransactionLog _transactionLog;
+    private long _txnStartTime = 0L;
 
     public LocalTransaction(TransactionLog transactionLog)
     {
         _transactionLog = transactionLog;
     }
+    
+    public boolean inTransaction()
+    {
+        return _transaction != null;
+    }
+    
+    public long getTransactionStartTime()
+    {
+        return _txnStartTime;
+    }
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
@@ -89,7 +105,6 @@ public class LocalTransaction implements
 
         try
         {
-
             for(QueueEntry entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements
             _logger.error("Error during message dequeues", e);
             tidyUpOnError(e);
         }
-
     }
 
     private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+		resetDetails();
             }
         }
 
@@ -193,8 +206,25 @@ public class LocalTransaction implements
     {
         _postTransactionActions.add(postTransactionAction);
 
+        if (_txnStartTime == 0L)
+        {
+            _txnStartTime = System.currentTimeMillis();
+        }
+
         if(message.isPersistent())
         {
+            if(_transaction == null)
+            {
+                for(BaseQueue queue : queues)
+                {
+                    if(queue.isDurable())
+                    {
+                        beginTranIfNecessary();
+                        break;
+                    }
+                }
+            }
+
             try
             {
                 for(BaseQueue queue : queues)
@@ -248,17 +278,14 @@ public class LocalTransaction implements
         }
         finally
         {
-            _transaction = null;
-            _postTransactionActions.clear();
+            resetDetails();
         }
-
     }
 
     public void rollback()
     {
         try
         {
-
             if(_transaction != null)
             {
                 _transaction.abortTran();
@@ -280,9 +307,15 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+                resetDetails();
             }
         }
     }
+    
+    private void resetDetails()
+    {
+        _transaction = null;
+	_postTransactionActions.clear();
+        _txnStartTime = 0L;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Mon Aug 15 09:43:16 2011
@@ -52,6 +52,13 @@ public interface ServerTransaction
         public void onRollback();
     }
 
+    /**
+     * Return the time the current transaction started.
+     * 
+     * @return the time this transaction started or 0 if not in a transaction
+     */
+    long getTransactionStartTime();
+
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 09:43:16 2011
@@ -1,2 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1073294-1090000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Aug 15 09:43:16 2011
@@ -20,31 +20,29 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.UUID;
+
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.binding.BindingFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.TimerTask;
-import java.util.concurrent.FutureTask;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
 
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
 {
     IConnectionRegistry getConnectionRegistry();
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Mon Aug 15 09:43:16 2011
@@ -20,12 +20,18 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -58,6 +64,8 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -67,6 +75,7 @@ import org.apache.qpid.server.registry.I
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
@@ -106,6 +115,8 @@ public class VirtualHostImpl implements 
     private BrokerConfig _broker;
     private UUID _id;
 
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private final long _createTime = System.currentTimeMillis();
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -157,12 +168,12 @@ public class VirtualHostImpl implements 
 
         public String getObjectInstanceName()
         {
-            return _name.toString();
+            return ObjectName.quote(_name);
         }
 
         public String getName()
         {
-            return _name.toString();
+            return _name;
         }
 
         public VirtualHostImpl getVirtualHost()
@@ -245,6 +256,8 @@ public class VirtualHostImpl implements 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
         initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+        
+        initialiseStatistics();
     }
 
 	private void initialiseHouseKeeping(long period)
@@ -277,19 +290,30 @@ public class VirtualHostImpl implements 
                             // house keeping task from running.
                         }
                     }
+                    for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+                    {
+                        _logger.debug("Checking for long running open transactions on connection " + connection);
+                        for (AMQSessionModel session : connection.getSessionModels())
+                        {
+	                        _logger.debug("Checking for long running open transactions on session " + session);
+                            try
+                            {
+                                session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+	                                                           _configuration.getTransactionTimeoutOpenClose(),
+	                                                           _configuration.getTransactionTimeoutIdleWarn(),
+	                                                           _configuration.getTransactionTimeoutIdleClose());
+	                            }
+                            catch (Exception e)
+                            {
+                                _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+                            }
+                        }
+                    }
                 }
             }
 
             scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
 
-            class ForceChannelClosuresTask extends TimerTask
-            {
-                public void run()
-                {
-                    _connectionRegistry.expireClosedChannels();
-                }
-            }
-
             Map<String, VirtualHostPluginFactory> plugins =
                 ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
 
@@ -623,6 +647,80 @@ public class VirtualHostImpl implements 
     {
         return _bindingFactory;
     }
+    
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _appRegistry.registerMessageDelivered(messageSize);
+    }
+    
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _appRegistry.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+        
+        for (AMQConnectionModel connection : _connectionRegistry.getConnections())
+        {
+            connection.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+        _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+        _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 
     public void createBrokerConnection(final String transport,
                                        final String host,

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Mon Aug 15 09:43:16 2011
@@ -364,7 +364,7 @@ public class Show extends AbstractComman
             {
                 if(msg instanceof AMQMessage)
                 {
-                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
                 }
             }
             catch (AMQException e)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Aug 15 09:43:16 2011
@@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTest
     static ContentHeaderBody getContentHeader(FieldTable headers)
     {
         ContentHeaderBody header = new ContentHeaderBody();
-        header.properties = getProperties(headers);
+        header.setProperties(getProperties(headers));
         return header;
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Mon Aug 15 09:43:16 2011
@@ -396,7 +396,7 @@ public class TopicExchangeTest extends I
         IncomingMessage message = new IncomingMessage(info);
         final ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-        chb.properties = props;
+        chb.setProperties(props);
         message.setContentHeaderBody(chb);
 
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Mon Aug 15 09:43:16 2011
@@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extend
         AMQMessage msg = super.createMessage(id);
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
         props.setPriority(i);
-        msg.getContentHeaderBody().properties = props;
+        msg.getContentHeaderBody().setProperties(props);
         return msg;
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Mon Aug 15 09:43:16 2011
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends I
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-        contentHeaderBody.properties = props;
+        contentHeaderBody.setProperties(props);
         contentHeaderBody.bodySize = size;   // in bytes
         IncomingMessage message = new IncomingMessage(publish);
         message.setContentHeaderBody(contentHeaderBody);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Mon Aug 15 09:43:16 2011
@@ -402,8 +402,8 @@ public class AMQQueueMBeanTest extends I
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
-        contentHeaderBody.properties = new BasicContentHeaderProperties();
-        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+        contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+        ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
         IncomingMessage msg = new IncomingMessage(publish);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1157750&r1=1157749&r2=1157750&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Mon Aug 15 09:43:16 2011
@@ -126,7 +126,7 @@ public class AckTest extends InternalBro
             //IncomingMessage msg2 = null;
             BasicContentHeaderProperties b = new BasicContentHeaderProperties();
             ContentHeaderBody cb = new ContentHeaderBody();
-            cb.properties = b;
+            cb.setProperties(b);
 
             if (persistent)
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org