You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/06 10:57:39 UTC

svn commit: r504056 [1/5] - in /incubator/qpid/branches/perftesting_persistent: ./ qpid/gentools/src/org/apache/qpid/gentools/ qpid/java/ qpid/java/broker/etc/ qpid/java/broker/src/main/java/org/apache/qpid/server/ qpid/java/broker/src/main/java/org/ap...

Author: ritchiem
Date: Tue Feb  6 01:57:35 2007
New Revision: 504056

URL: http://svn.apache.org/viewvc?view=rev&rev=504056
Log:
Merged revisions 501413-503717 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/trunk

........
  r501448 | kpvdr | 2007-01-30 16:27:47 +0000 (Tue, 30 Jan 2007) | 1 line
  
  Fixed codegen bug in which fields added by second XML file duplicate ordinal values.
........
  r501455 | rgreig | 2007-01-30 16:40:20 +0000 (Tue, 30 Jan 2007) | 1 line
  
  (Submitted by Rupert Smith) Ping tests refactored. Unused ping test classes removed. JUnit-toolkit 0.5-SNAPSHOT added to the build.
........
  r501457 | rgreig | 2007-01-30 16:42:37 +0000 (Tue, 30 Jan 2007) | 1 line
  
  (Submitted by Rupert Smith) Added PingClient.java which was forgotten from last commit.
........
  r501465 | rgreig | 2007-01-30 16:53:41 +0000 (Tue, 30 Jan 2007) | 1 line
  
  (Submitted by Rupert Smith) Updated the README.txt to give a fuller explanation for the creation of the temporary local maven repository.
........
  r501472 | kpvdr | 2007-01-30 16:59:38 +0000 (Tue, 30 Jan 2007) | 1 line
  
  Small codegen code tidy-up
........
  r501804 | rgreig | 2007-01-31 11:29:33 +0000 (Wed, 31 Jan 2007) | 3 lines
  
  (Patch submitted by Rupert Smith)
  Added a ping latency test. 
  Uploaded new junit-toolkit snapshot for self timed tests.
........
  r501914 | ritchiem | 2007-01-31 17:25:42 +0000 (Wed, 31 Jan 2007) | 3 lines
  
  QPID-334 WeakReferenceMessageHandle uses a singleton so when body is purged by gc it cannot be reset 
  
  Changed to use an Arraylist of size 1 as per JIRA entry.
........
  r501917 | ritchiem | 2007-01-31 17:31:04 +0000 (Wed, 31 Jan 2007) | 6 lines
  
  QPID-333 Message Properties on non Qpid Messages are not preserved
  Updated MessageConverter to have a constructor that takes a Message type. 
  
  Updated MessageConverterTest to use the new NonQpidMessage to test it out.
  
  JMSHeaderAdapter.java - whitespace changes and comment noting that null return is required.
........
  r501920 | ritchiem | 2007-01-31 17:43:45 +0000 (Wed, 31 Jan 2007) | 1 line
  
  Unused so removing
........
  r501945 | vinoski | 2007-01-31 19:00:26 +0000 (Wed, 31 Jan 2007) | 1 line
  
  patch from Jonathan Anstey for QPID-332
........
  r502172 | ritchiem | 2007-02-01 09:37:39 +0000 (Thu, 01 Feb 2007) | 3 lines
  
  QPID-333 Committed test class rename to stop it being picked up by Surefire
  
  AMQTopic.java - whitespace
........
  r502178 | bhupendrab | 2007-02-01 10:01:32 +0000 (Thu, 01 Feb 2007) | 1 line
  
  virtual host string corrected
........
  r502179 | rgreig | 2007-02-01 10:13:21 +0000 (Thu, 01 Feb 2007) | 1 line
  
  (Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests.
........
  r502180 | bhupendrab | 2007-02-01 10:13:55 +0000 (Thu, 01 Feb 2007) | 2 lines
  
  QPID-331
  and setting operation parameters to default values after executing the operation once.
........
  r502182 | rgreig | 2007-02-01 10:18:36 +0000 (Thu, 01 Feb 2007) | 1 line
  
  (Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests.
........
  r502248 | ritchiem | 2007-02-01 15:47:17 +0000 (Thu, 01 Feb 2007) | 7 lines
  
  QPID-339 Java client hangs when starting up (intermittently)
  
  Patched the problem where the dispatcher would hang. The previous logic was flawed.
  
  Patch worked on by Robert Godfrey and Martin Ritchie.
  
  Added test to ensure that the connection is not automatically started.
........
  r502249 | ritchiem | 2007-02-01 15:50:52 +0000 (Thu, 01 Feb 2007) | 3 lines
  
  QPID-330 Clients occasionally fail to notice connect
  
  The AMQConnection.java constructor now deals with the full connection process. The failover thread should not be started. This allows the connection method to be simplified and not Thread.sleep waiting for the connection.
........
  r502253 | ritchiem | 2007-02-01 16:01:14 +0000 (Thu, 01 Feb 2007) | 11 lines
  
  QPID-339 Java client hangs when starting up (intermittently)
  
  Patched the problem where the dispatcher would hang. The previous logic was flawed.
  
  Patch worked on by Robert Godfrey and Martin Ritchie.
  
  Added test to ensure that the connection is not automatically started.
  
  (Only added the test last time by mistake. This is the actual fix)
  
  With a test for the DispatcherTest
........
  r502261 | ritchiem | 2007-02-01 16:25:57 +0000 (Thu, 01 Feb 2007) | 2 lines
  
  QPID-339 DispatcherTest.java was broker now it actually tests correctly.
  Added test to Check changing message listeners
........
  r502268 | ritchiem | 2007-02-01 16:32:56 +0000 (Thu, 01 Feb 2007) | 1 line
  
  Increased logging on a failure to attain state
........
  r502269 | bhupendrab | 2007-02-01 16:34:21 +0000 (Thu, 01 Feb 2007) | 1 line
  
  some part commented, so that it lets users copy paste the host details on the new connection window
........
  r502271 | ritchiem | 2007-02-01 16:36:54 +0000 (Thu, 01 Feb 2007) | 3 lines
  
  QPID-341  When using Queues and Topics defined via JNDI settings are not preserved.
  
  Removed extraction of destination/queue name and used BindingURL directly to create Destination.
........
  r502273 | ritchiem | 2007-02-01 16:38:45 +0000 (Thu, 01 Feb 2007) | 2 lines
  
  Added more intelij files to the ignore list
........
  r502576 | ritchiem | 2007-02-02 11:13:13 +0000 (Fri, 02 Feb 2007) | 4 lines
  
  QPID-343 Performance test suite doesn't output missing message count on failure.
  
  Updated PingAsyncTestPerf to output missing messsage count.
  Updated PingPongProducer so it doesn't use AMQShortStringx.
........
  r502610 | bhupendrab | 2007-02-02 14:26:32 +0000 (Fri, 02 Feb 2007) | 2 lines
  
  QPID-84
  tests for FSContextFactory deleted.fscontext.jar is not part of apache svn.
........
  r502620 | rgreig | 2007-02-02 15:09:08 +0000 (Fri, 02 Feb 2007) | 3 lines
  
  (Submitted by Rupert Smith) 
  Perftests improved with better timeout handling. Shared/unique destinations to ping now an option.
  TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs.
........
  r502627 | rgreig | 2007-02-02 15:31:30 +0000 (Fri, 02 Feb 2007) | 2 lines
  
  (Submitted by Rupert Smith)
  Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with.
........
  r502655 | rgreig | 2007-02-02 16:59:14 +0000 (Fri, 02 Feb 2007) | 1 line
  
  (Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used!
........
  r503593 | ritchiem | 2007-02-05 08:58:30 +0000 (Mon, 05 Feb 2007) | 1 line
  
  Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created. 
........
  r503604 | rgreig | 2007-02-05 09:40:04 +0000 (Mon, 05 Feb 2007) | 1 line
  
  QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
........
  r503609 | ritchiem | 2007-02-05 09:49:59 +0000 (Mon, 05 Feb 2007) | 1 line
  
  Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
........
  r503637 | rgreig | 2007-02-05 11:17:08 +0000 (Mon, 05 Feb 2007) | 2 lines
  
  (Submitted by Rupert Smith)
  Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated.
........
  r503646 | rgreig | 2007-02-05 11:28:57 +0000 (Mon, 05 Feb 2007) | 2 lines
  
  (Submitted by Rupert Smith)
  This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository.
........
  r503706 | bhupendrab | 2007-02-05 14:45:18 +0000 (Mon, 05 Feb 2007) | 2 lines
  
  QPID-213 
  Also the parameter selection of boolean type is made as check-boxes instead of a drop-down.
........

Added:
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
      - copied unchanged from r503706, incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
Removed:
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/bin/
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java
Modified:
    incubator/qpid/branches/perftesting_persistent/   (props changed)
    incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpClass.java
    incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
    incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/Main.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/etc/virtualhosts.xml
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/distribution/src/main/assembly/src.xml
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/   (props changed)
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ManagedBean.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXManagedObject.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/pom.xml
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/pom.xml
    incubator/qpid/branches/perftesting_persistent/qpid/java/systests/   (props changed)

Propchange: incubator/qpid/branches/perftesting_persistent/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Feb  6 01:57:35 2007
@@ -1 +1 @@
-/incubator/qpid/trunk:1-501412
+/incubator/qpid/trunk:1-503717

Modified: incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpClass.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpClass.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpClass.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpClass.java Tue Feb  6 01:57:35 2007
@@ -59,7 +59,7 @@
 			indexMap.put(index, indexVersionSet);
 		}
 		NodeList nList = classNode.getChildNodes();
-		int fieldCntr = 0;
+		int fieldCntr = fieldMap.size();
 		for (int i=0; i<nList.getLength(); i++)
 		{
 			Node child = nList.item(i);
@@ -82,7 +82,6 @@
 					thisField.removeVersion(version);
 					fieldMap.remove(fieldName);
 				}
-				fieldCntr++;
 			}
 			else if (child.getNodeName().compareTo(Utils.ELEMENT_METHOD) == 0)
 			{
@@ -94,7 +93,7 @@
 					thisMethod = new AmqpMethod(methodName, converter);
 					methodMap.put(methodName, thisMethod);
 				}			
-				if (!thisMethod.addFromNode(child, fieldCntr++, version))
+				if (!thisMethod.addFromNode(child, 0, version))
 				{
 					String className = converter.prepareClassName(Utils.getNamedAttribute(classNode,
 							Utils.ATTRIBUTE_NAME));

Modified: incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpMethod.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpMethod.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/AmqpMethod.java Tue Feb  6 01:57:35 2007
@@ -63,7 +63,7 @@
 			indexMap.put(index, indexVersionSet);
 		}
 		NodeList nList = methodNode.getChildNodes();
-		int fieldCntr = 0;
+		int fieldCntr = fieldMap.size();
 		for (int i=0; i<nList.getLength(); i++)
 		{
 			Node child = nList.item(i);

Modified: incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/Main.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/Main.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/gentools/src/org/apache/qpid/gentools/Main.java Tue Feb  6 01:57:35 2007
@@ -240,7 +240,7 @@
             new File(tmplDir + Utils.fileSeparator + "AMQP_Constants.h.tmpl"),
             new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.h.tmpl"),
             new File(tmplDir + Utils.fileSeparator + "AMQP_MethodVersionMap.cpp.tmpl"),
-           new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
+            new File(tmplDir + Utils.fileSeparator + "AMQP_HighestVersion.h.tmpl")
         };
         methodTemplateFiles = new File[]
         {

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/etc/virtualhosts.xml?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/etc/virtualhosts.xml Tue Feb  6 01:57:35 2007
@@ -21,18 +21,79 @@
  -->
 <virtualhosts>
     <virtualhost>
-        <path>localhost</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+        <name>localhost</name>
+
+        <localhost>
+            <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+            <maximumMessageCount>5000</maximumMessageCount>
+            <queue>
+                <name>queue</name>
+                <queue>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                </queue>
+            </queue>
+            <queue>
+                <name>ping</name>
+                <ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                </ping>
+            </queue>
+        </localhost>
     </virtualhost>
 	<virtualhost>
-        <path>development</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+        <name>development</name>
+        <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+        <maximumMessageCount>5000</maximumMessageCount>
+        <development>
+            <queue>
+                <name>queue</name>
+                <queue>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                </queue>
+            </queue>
+            <queue>
+                <name>ping</name>
+                <ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                </ping>
+            </queue>
+        </development>
     </virtualhost>
 		<virtualhost>
-        <path>test</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+            <name>test</name>
+            <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+            <maximumMessageCount>5000</maximumMessageCount>
+            <test>
+                <queue>
+                    <name>queue</name>
+                    <queue>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                    </queue>
+                </queue>
+                <queue>
+                    <name>ping</name>
+                    <ping>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                    </ping>
+                </queue>
+            </test>
     </virtualhost>
 </virtualhosts>

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Tue Feb  6 01:57:35 2007
@@ -153,7 +153,12 @@
 
         try
         {
-            queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost());
+            AMQShortString ownerShortString = null;
+            if (owner != null)
+            {
+                ownerShortString = new AMQShortString(owner);
+            }
+            queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, autoDelete, getVirtualHost());
             if (queue.isDurable() && !queue.isAutoDelete())
             {
                 _messageStore.createQueue(queue);

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Tue Feb  6 01:57:35 2007
@@ -34,9 +34,13 @@
 import org.apache.log4j.Logger;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
 
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
 
 public class VirtualHostConfiguration
 {
@@ -44,11 +48,7 @@
 
     XMLConfiguration _config;
 
-    private static final String XML_VIRTUALHOST = "virtualhost";
-    private static final String XML_PATH = "path";
-    private static final String XML_BIND = "bind";
-    private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
-    private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+    private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
 
 
     public VirtualHostConfiguration(String configFile) throws ConfigurationException
@@ -57,137 +57,66 @@
 
         _config = new XMLConfiguration(configFile);
 
-        if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
-        {
-            throw new ConfigurationException(
-                    "Virtualhost Configuration document does not contain a valid virtualhost.");
-        }
     }
 
-    public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
-    {
-        Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
-
-        if (prop instanceof Collection)
-        {
-            _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
 
-            int virtualhosts = ((Collection) prop).size();
-            for (int vhost = 0; vhost < virtualhosts; vhost++)
-            {
-                loadVirtualHost(vhost);
-            }
-        }
-        else
-        {
-            loadVirtualHost(-1);
-        }
-    }
 
-    private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+    private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
     {
-        String path = XML_VIRTUALHOST;
-
-        if (index != -1)
-        {
-            path = path + "(" + index + ")";
-        }
-
-        Object prop = _config.getProperty(path + "." + XML_PATH);
-
-        if (prop == null)
-        {
-            prop = _config.getProperty(path + "." + XML_BIND);
-            String error = "Virtual Host not defined for binding";
-
-            if (prop != null)
-            {
-                if (prop instanceof Collection)
-                {
-                    error += "s";
-                }
+        _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
 
-                error += ": " + prop;
-            }
 
-            throw new ConfigurationException(error);
-        }
+        VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
 
-        _logger.info("VirtualHost:'" + prop + "'");
-        String virtualHost = prop.toString();
 
-        prop = _config.getProperty(path + "." + XML_BIND);
-        if (prop instanceof Collection)
-        {
-            int bindings = ((Collection) prop).size();
-            _logger.debug("Number of Bindings: " + bindings);
-            for (int dest = 0; dest < bindings; dest++)
-            {
-                loadBinding(virtualHost, path, dest);
-            }
-        }
-        else
-        {
-            loadBinding(virtualHost,path, -1);
-        }
-    }
 
-    private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
-    {
-        String path = rootpath + "." + XML_BIND;
-        if (index != -1)
+        if(virtualHost == null)
         {
-            path = path + "(" + index + ")";
+            throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
         }
 
-        String bindingString = _config.getString(path);
-
-        AMQBindingURL binding = new AMQBindingURL(bindingString);
+        List queueNames = configuration.getList("queue.name");
 
-        _logger.debug("Loaded Binding:" + binding);
-
-        try
-        {
-            bind(virtualHost, binding);
-        }
-        catch (AMQException amqe)
+        for(Object queueNameObj : queueNames)
         {
-            _logger.info("Unable to bind url: " + binding);
-            throw amqe;
+            String queueName = String.valueOf(queueNameObj);
+            configureQueue(virtualHost, queueName, configuration);
         }
+
     }
 
-    private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException
+    private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
     {
+        CompositeConfiguration queueConfiguration = new CompositeConfiguration();
 
-        AMQShortString queueName = binding.getQueueName();
+        queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString));
+        queueConfiguration.addConfiguration(configuration);
 
-        // This will occur if the URL is a Topic
-        if (queueName == null)
-        {
-            //todo register valid topic
-            ///queueName = binding.getDestinationName();
-            throw new AMQException("Topics cannot be bound. TODO Register valid topic");
-        }
-
-        //Get references to Broker Registries
-        VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore messageStore = virtualHost.getMessageStore();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
 
+
+        AMQShortString queueName = new AMQShortString(queueNameString);
+
+        AMQQueue queue;
+
         synchronized (queueRegistry)
         {
-            AMQQueue queue = queueRegistry.getQueue(queueName);
+            queue = queueRegistry.getQueue(queueName);
 
             if (queue == null)
             {
-                _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+                _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
+
+                boolean durable = queueConfiguration.getBoolean("durable" ,false);
+                boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
+                String owner = queueConfiguration.getString("owner", null);
 
                 queue = new AMQQueue(queueName,
-                        Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
-                        null /* These queues will have no owner */,
-                        false /* Therefore autodelete makes no sence */, virtualHost);
+                        durable,
+                        owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
+                        autodelete /* Therefore autodelete makes no sence */, virtualHost);
 
                 if (queue.isDurable())
                 {
@@ -198,27 +127,69 @@
             }
             else
             {
-                _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+                _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
             }
 
-            Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
-            synchronized (defaultExchange)
+            String exchangeName = queueConfiguration.getString("exchange", null);
+
+            Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
+
+            if(exchange == null)
             {
-                if (defaultExchange == null)
-                {
-                    throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
-                }
+                exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
+            }
 
-                defaultExchange.registerQueue(queue.getName(), queue, null);
+            if (exchange == null)
+            {
+                throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+            }
 
-                if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+            synchronized (exchange)
+            {
+                List routingKeys = queueConfiguration.getList("routingKey");
+                if(routingKeys == null || routingKeys.isEmpty())
                 {
-                    throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+                    routingKeys = Collections.singletonList(queue.getName());
                 }
 
-                queue.bind(binding.getRoutingKey(), defaultExchange);
+                for(Object routingKeyNameObj : routingKeys)
+                {
+                    AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+                    exchange.registerQueue(routingKey, queue, null);
+
+                    queue.bind(routingKey, exchange);
+
+                    _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+                }
             }
-            _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
         }
+
+
+
+        Configurator.configure(queue, queueConfiguration);
     }
+
+
+    public void performBindings() throws AMQException, ConfigurationException
+    {
+        List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
+
+        _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+        for(Object nameObject : virtualHostNames)
+        {
+            String name = String.valueOf(nameObject);
+            configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+        }
+
+        if (virtualHostNames == null || virtualHostNames.isEmpty())
+        {
+            throw new ConfigurationException(
+                    "Virtualhost Configuration document does not contain a valid virtualhost.");
+        }
+    }
+
+
+
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Feb  6 01:57:35 2007
@@ -67,6 +67,11 @@
         _defaultExchange = exchange;
     }
 
+    public Exchange getDefaultExchange()
+    {
+        return _defaultExchange;
+    }
+
     public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
     {
         // TODO: check inUse argument

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Feb  6 01:57:35 2007
@@ -40,4 +40,6 @@
     Exchange getExchange(AMQShortString name);
 
     void setDefaultExchange(Exchange exchange);
+
+    Exchange getDefaultExchange();
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Feb  6 01:57:35 2007
@@ -509,6 +509,12 @@
         _messageHandle.setRedelivered(redelivered);
     }
 
+    public long getArrivalTime()
+    {
+        return _messageHandle.getArrivalTime();
+    }
+
+
     /**
      * Called when this message is delivered to a consumer. (used to
      * implement the 'immediate' flag functionality).

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Tue Feb  6 01:57:35 2007
@@ -74,4 +74,6 @@
     void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
 
     void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
+
+    long getArrivalTime();
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb  6 01:57:35 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,6 @@
 import javax.management.JMException;
 import java.text.MessageFormat;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -130,22 +130,36 @@
     /**
      * max allowed size(KB) of a single message
      */
-    private long _maxMessageSize = 10000;
+    private long _maximumMessageSize = 10000;
 
     /**
      * max allowed number of messages on a queue.
      */
-    private Integer _maxMessageCount = 10000;
+    @Configured(path = "maximumMessageCount", defaultValue = "0")
+    public int _maximumMessageCount;
 
     /**
-     * max queue depth(KB) for the queue
+     * max queue depth for the queue
      */
-    private long _maxQueueDepth = 10000000;
+    @Configured(path = "maximumQueueDepth", defaultValue = "0")
+    public long _maximumQueueDepth = 10000000;
+
+/*
+     * maximum message age before alerts occur
+     */
+    @Configured(path = "maximumMessageAge", defaultValue = "0")
+    public long _maximumMessageAge = 30000; //0
+
+    /*
+     * the minimum interval between sending out consequetive alerts of the same type
+     */
+    @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+    public long _minimumAlertRepeatGap = 30000;
 
     /**
      * total messages received by the queue since startup.
      */
-    private long _totalMessagesReceived = 0;
+    public long _totalMessagesReceived = 0;
 
     public int compareTo(Object o)
     {
@@ -286,50 +300,56 @@
         return _managedObject;
     }
 
-    public Long getMaximumMessageSize()
+    public long getMaximumMessageSize()
     {
-        return _maxMessageSize;
+        return _maximumMessageSize;
     }
 
-    public void setMaximumMessageSize(Long value)
+    public void setMaximumMessageSize(long value)
     {
-        _maxMessageSize = value;
+        _maximumMessageSize = value;
     }
 
-    public Integer getConsumerCount()
+    public int getConsumerCount()
     {
         return _subscribers.size();
     }
 
-    public Integer getActiveConsumerCount()
+    public int getActiveConsumerCount()
     {
         return _subscribers.getWeight();
     }
 
-    public Long getReceivedMessageCount()
+    public long getReceivedMessageCount()
     {
         return _totalMessagesReceived;
     }
 
-    public Integer getMaximumMessageCount()
+    public int getMaximumMessageCount()
     {
-        return _maxMessageCount;
+        return _maximumMessageCount;
     }
 
-    public void setMaximumMessageCount(Integer value)
+    public void setMaximumMessageCount(int value)
     {
-        _maxMessageCount = value;
+        _maximumMessageCount = value;
     }
 
     public long getMaximumQueueDepth()
     {
-        return _maxQueueDepth;
+        return _maximumQueueDepth;
     }
 
     // Sets the queue depth, the max queue size
     public void setMaximumQueueDepth(long value)
     {
-        _maxQueueDepth = value;
+        _maximumQueueDepth = value;
+    }
+
+    public long getOldestMessageArrivalTime()
+    {
+        return _deliveryMgr.getOldestMessageArrival();
+        
     }
 
     /**
@@ -631,6 +651,24 @@
         _deleteTaskList.add(task);
     }
 
+    public long getMinimumAlertRepeatGap()
+    {
+        return _minimumAlertRepeatGap;
+    }
 
+    public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+    {
+        _minimumAlertRepeatGap = minimumAlertRepeatGap;
+    }
+
+    public long getMaximumMessageAge()
+    {
+        return _maximumMessageAge;
+    }
+
+    public void setMaximumMessageAge(long maximumMessageAge)
+    {
+        _maximumMessageAge = maximumMessageAge;
+    }
 
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Feb  6 01:57:35 2007
@@ -22,12 +22,12 @@
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.Main;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
 
 import javax.management.openmbean.*;
 import javax.management.*;
@@ -41,8 +41,11 @@
  * for an AMQQueue.
  */
 @MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
 {
+
+    private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
     /**
      * Since the MBean is not associated with a real channel we can safely create our own store context
      * for use in the few methods that require one.
@@ -63,6 +66,9 @@
     private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
     private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
 
+
+    private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
     public AMQQueueMBean(AMQQueue queue) throws JMException
     {
@@ -213,38 +219,38 @@
         return msg.getContentHeaderBody().bodySize;
     }
 
+
+
     /**
      * Checks if there is any notification to be send to the listeners
      */
     public void checkForNotification(AMQMessage msg) throws AMQException, JMException
     {
-        // Check for threshold message count
-        Integer msgCount = getMessageCount();
-        if (msgCount >= getMaximumMessageCount())
-        {
-            notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
-        }
 
-        // Check for threshold message size
-        long messageSize = getMessageSize(msg);
-        if (messageSize >= _queue.getMaximumMessageSize())
-        {
-            notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
-        }
+        final long currentTime = System.currentTimeMillis();
+        final long thresholdTime =  currentTime - _queue.getMinimumAlertRepeatGap();
 
-        // Check for threshold queue depth in bytes
-        long queueDepth = getQueueDepthKb();
-        if (queueDepth >= _queue.getMaximumQueueDepth())
+        for(NotificationCheck check : NotificationCheck.values())
         {
-            notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+            if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime)
+            {
+                if(check.notifyIfNecessary(msg, _queue, this))
+                {
+                    _lastNotificationTimes[check.ordinal()] = currentTime;
+                }
+            }
         }
+
     }
 
     /**
      * Sends the notification to the listeners
      */
-    private void notifyClients(String notificationMsg)
+    public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
     {
+        // important : add log to the log file - monitoring tools may be looking for this
+        _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
         Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
                 ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
 

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb  6 01:57:35 2007
@@ -160,6 +160,12 @@
         return _totalMessageSize.get();
     }
 
+    public long getOldestMessageArrival()
+    {
+        AMQMessage msg = _messages.peek();
+        return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+    }
+
 
     public synchronized List<AMQMessage> getMessages()
     {

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Feb  6 01:57:35 2007
@@ -83,4 +83,6 @@
     boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
 
     long getTotalMessageSize();
+
+    long getOldestMessageArrival();
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Tue Feb  6 01:57:35 2007
@@ -43,6 +43,8 @@
 
     private boolean _redelivered;
 
+    private long _arrivalTime;
+
     public InMemoryMessageHandle()
     {
     }
@@ -114,6 +116,7 @@
     {
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
+        _arrivalTime = System.currentTimeMillis();
     }
 
     public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -130,4 +133,10 @@
     {
         // NO OP
     }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Tue Feb  6 01:57:35 2007
@@ -31,11 +31,19 @@
 
     private int _contentChunkCount;
 
+    private long _arrivalTime;
+
     public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
     {
+        this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+    }
+
+    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+    {
         _contentHeaderBody = contentHeaderBody;
         _publishBody = publishBody;
         _contentChunkCount = contentChunkCount;
+        _arrivalTime = arrivalTime;
     }
 
     public int getContentChunkCount()
@@ -66,5 +74,15 @@
     public void setPublishBody(BasicPublishBody publishBody)
     {
         _publishBody = publishBody;
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
+    public void setArrivalTime(long arrivalTime)
+    {
+        _arrivalTime = arrivalTime;
     }
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Tue Feb  6 01:57:35 2007
@@ -49,6 +49,8 @@
 
     private final MessageStore _messageStore;
 
+    private long _arrivalTime;
+
 
     public WeakReferenceMessageHandle(MessageStore messageStore)
     {
@@ -57,17 +59,30 @@
 
     public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
     {
-        ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
+        ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
         if (chb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(messageId);
             chb = mmd.getContentHeaderBody();
-            _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
-            _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
         }
         return chb;
     }
 
+    private MessageMetaData loadMessageMetaData(Long messageId)
+            throws AMQException
+    {
+        MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+        populateFromMessageMetaData(mmd);
+        return mmd;
+    }
+
+    private void populateFromMessageMetaData(MessageMetaData mmd)
+    {
+        _arrivalTime = mmd.getArrivalTime();
+        _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+        _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+    }
+
     public int getBodyCount(Long messageId) throws AMQException
     {
         if (_contentBodies == null)
@@ -107,6 +122,7 @@
 
     /**
      * Content bodies are set <i>before</i> the publish and header frames
+     *
      * @param storeContext
      * @param messageId
      * @param contentBody
@@ -115,10 +131,9 @@
      */
     public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
     {
-        if(_contentBodies == null && isLastContentBody)
+        if (_contentBodies == null && isLastContentBody)
         {
-            _contentBodies = Collections.singletonList(new WeakReference<ContentBody>(contentBody));
-
+            _contentBodies = new ArrayList<WeakReference<ContentBody>>(1);
         }
         else
         {
@@ -126,22 +141,19 @@
             {
                 _contentBodies = new LinkedList<WeakReference<ContentBody>>();
             }
-
-
-            _contentBodies.add(new WeakReference<ContentBody>(contentBody));
         }
+        _contentBodies.add(new WeakReference<ContentBody>(contentBody));
         _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
     }
 
     public BasicPublishBody getPublishBody(Long messageId) throws AMQException
     {
-        BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
+        BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
         if (bpb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(messageId);
+
             bpb = mmd.getPublishBody();
-            _publishBody = new WeakReference<BasicPublishBody>(bpb);
-            _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
         }
         return bpb;
     }
@@ -166,6 +178,7 @@
 
     /**
      * This is called when all the content has been received.
+     *
      * @param publishBody
      * @param contentHeaderBody
      * @throws AMQException
@@ -180,10 +193,15 @@
         {
             _contentBodies = new LinkedList<WeakReference<ContentBody>>();
         }
-        _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
-                                                                                        _contentBodies.size()));
-        _publishBody = new WeakReference<BasicPublishBody>(publishBody);
-        _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
+
+        final long arrivalTime = System.currentTimeMillis();
+
+
+        MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
+
+        _messageStore.storeMessageMetaData(storeContext, messageId, mmd);
+
+        populateFromMessageMetaData(mmd);
     }
 
     public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -200,4 +218,10 @@
     {
         _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
     }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Feb  6 01:57:35 2007
@@ -215,12 +215,13 @@
         Exception lastException = new Exception();
         lastException.initCause(new ConnectException());
 
-        while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed())
+        while (!_connected && _failoverPolicy.failoverAllowed())
         {
             try
             {
                 makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
                 lastException = null;
+                _connected = true;
             }
             catch (Exception e)
             {
@@ -232,34 +233,7 @@
 
         _logger.debug("Are we connected:" + _connected);
 
-        // Then the Failover Thread will handle conneciton
-        if (_failoverPolicy.failoverAllowed())
-        {
-            //TODO this needs to be redone so that we are not spinning.
-            // A suitable object should be set that is then waited on
-            // and only notified when a connection is made or when
-            // the AMQConnection gets closed.
-            while (!_connected && !_closed.get())
-            {
-                try
-                {
-                    _logger.debug("Sleeping.");
-                    Thread.sleep(100);
-                }
-                catch (InterruptedException ie)
-                {
-                    _logger.debug("Woken up.");
-                }
-            }
-            if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
-            {
-                if (_lastAMQException != null)
-                {
-                    throw _lastAMQException;
-                }
-            }
-        }
-        else
+        if (!_connected)
         {
             String message = null;
 
@@ -318,7 +292,7 @@
 
     private void setVirtualHost(String virtualHost)
     {
-        if(virtualHost.startsWith("/"))
+        if (virtualHost.startsWith("/"))
         {
             virtualHost = virtualHost.substring(1);
         }
@@ -403,7 +377,14 @@
 
     public boolean failoverAllowed()
     {
-        return _failoverPolicy.failoverAllowed();
+        if (!_connected)
+        {
+            return false;
+        }
+        else
+        {
+            return _failoverPolicy.failoverAllowed();
+        }
     }
 
     public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
@@ -815,6 +796,11 @@
         return _protocolHandler;
     }
 
+    public boolean started()
+    {
+        return _started;
+    }
+
     public void bytesSent(long writtenBytes)
     {
         if (_connectionListener != null)
@@ -1031,4 +1017,5 @@
                 AMQConnectionFactory.class.getName(),
                 null);          // factory location
     }
+
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Tue Feb  6 01:57:35 2007
@@ -62,7 +62,7 @@
                                 String clientName, String virtualHost) throws URLSyntaxException
     {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
-                                  username + ":" + password + "@" + clientName +
+                                  username + ":" + password + "@" + clientName + "/" +
                                   virtualHost + "?brokerlist='" + broker + "'"));
     }
 
@@ -334,7 +334,7 @@
 
                 if (addr != null)
                 {
-                    return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName());
+                    return new AMQQueue(new AMQBindingURL((String) addr.getContent()));
                 }
             }
 
@@ -344,7 +344,7 @@
 
                 if (addr != null)
                 {
-                    return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName());
+                    return new AMQTopic(new AMQBindingURL((String) addr.getContent()));
                 }
             }
 

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb  6 01:57:35 2007
@@ -93,8 +93,6 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
-    private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@
      */
     private long _nextProducerId;
 
-    /**
-     * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
-     */
-    private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
-    /**
-     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
-     */
-    private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
-    /**
-     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
-     */
-    private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
 
     /**
      * Set when recover is called. This is to handle the case where recover() is called by application code
@@ -157,14 +141,24 @@
      */
     private boolean _inRecovery;
 
+    private boolean _connectionStopped;
+
     private boolean _hasMessageListeners;
 
     /**
      * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
      */
-    
+
     private class Dispatcher extends Thread
     {
+
+        /**
+         * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+         */
+        private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+        private final Object _lock = new Object();
+
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@
         public void run()
         {
             UnprocessedMessage message;
-            _stopped.set(false);
+
             try
             {
-                while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+                while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
                 {
-                    dispatchMessage(message);
+                    synchronized (_lock)
+                    {
+
+                        while (connectionStopped())
+                        {
+                            _lock.wait();
+                        }
+
+                        dispatchMessage(message);
+
+                        while (connectionStopped())
+                        {
+                            _lock.wait();
+                        }
+
+                    }
+
                 }
             }
             catch (InterruptedException e)
@@ -189,6 +199,21 @@
             _logger.info("Dispatcher thread terminating for channel " + _channelId);
         }
 
+        // only call while holding lock
+        final boolean connectionStopped()
+        {
+            return _connectionStopped;
+        }
+
+        void setConnectionStopped(boolean connectionStopped)
+        {
+            synchronized (_lock)
+            {
+                _connectionStopped = connectionStopped;
+                _lock.notify();
+            }
+        }
+
         private void dispatchMessage(UnprocessedMessage message)
         {
             if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@
             }
         }
 
-        public void stopDispatcher()
+        public void close()
         {
-            _stopped.set(true);
+            _closed.set(true);
             interrupt();
+
+            //fixme awaitTermination
+
         }
     }
 
 
-
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry)
     {
@@ -285,8 +312,6 @@
         _defaultPrefetchHighMark = defaultPrefetchHighMark;
         _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
-        _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@
         }
     }
 
-    
+
     public void rollback() throws JMSException
     {
         checkTransacted();
@@ -654,7 +679,8 @@
     {
         if (_dispatcher != null)
         {
-            _dispatcher.stopDispatcher();
+            _dispatcher.close();
+            _dispatcher = null;
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
@@ -680,7 +706,8 @@
     {
         if (_dispatcher != null)
         {
-            _dispatcher.stopDispatcher();
+            _dispatcher.close();
+            _dispatcher = null;
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
@@ -712,8 +739,8 @@
         }
         // TODO: Be aware of possible changes to parameter order as versions change.
         getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                    getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                                    false));    // requeue
+                                                                        getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
+                                                                        false));    // requeue
     }
 
     boolean isInRecovery()
@@ -743,37 +770,36 @@
 
     public MessageListener getMessageListener() throws JMSException
     {
-        checkNotClosed();
+//        checkNotClosed();
         return _messageListener;
     }
 
     public void setMessageListener(MessageListener listener) throws JMSException
     {
-        checkNotClosed();
-
-        if (!isStopped())
-        {
-            throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
-        }
-
-        // We are stopped         
-        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-        {
-            BasicMessageConsumer consumer = i.next();
-
-            if (consumer.isReceiving())
-            {
-                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
-            }
-        }
-
-        _messageListener = listener;
-        
-        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-        {
-            i.next().setMessageListener(_messageListener);
-        }
-
+//        checkNotClosed();
+//
+//        if (_dispatcher != null && !_dispatcher.connectionStopped())
+//        {
+//            throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+//        }
+//
+//        // We are stopped
+//        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+//        {
+//            BasicMessageConsumer consumer = i.next();
+//
+//            if (consumer.isReceiving())
+//            {
+//                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+//            }
+//        }
+//
+//        _messageListener = listener;
+//
+//        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+//        {
+//            i.next().setMessageListener(_messageListener);
+//        }
 
     }
 
@@ -1582,13 +1608,17 @@
 
     void start()
     {
+        //fixme This should be controlled by _stopped as it pairs with the stop method
+        //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+        //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+        // if we have called stop.
         if (_startedAtLeastOnce.getAndSet(true))
         {
             //then we stopped this and are restarting, so signal server to resume delivery
             unsuspendChannel();
         }
 
-        if(hasMessageListeners() && _dispatcher == null)
+        if (hasMessageListeners())
         {
             startDistpatcherIfNecessary();
         }
@@ -1606,26 +1636,27 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
-        if(_dispatcher == null)
+        if (_dispatcher == null)
         {
             _dispatcher = new Dispatcher();
             _dispatcher.setDaemon(true);
             _dispatcher.start();
         }
+        else
+        {
+            _dispatcher.setConnectionStopped(false);
+        }
     }
 
     void stop()
     {
         //stop the server delivering messages to this session
         suspendChannel();
-
-        //stop the dispatcher thread
-        _stopped.set(true);
-    }
-
-    boolean isStopped()
-    {
-        return _stopped.get();
+        
+        if (_dispatcher != null)
+        {
+            _dispatcher.setConnectionStopped(true);
+        }
     }
 
     /**

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Tue Feb  6 01:57:35 2007
@@ -28,12 +28,12 @@
 import javax.jms.Topic;
 
 public class AMQTopic extends AMQDestination implements Topic
-    {
+{
     /**
-    * Constructor for use in creating a topic using a BindingURL.
+     * Constructor for use in creating a topic using a BindingURL.
      *
      * @param binding The binding url object.
-    */
+     */
     public AMQTopic(BindingURL binding)
     {
         super(binding);
@@ -78,7 +78,7 @@
         return super.getDestinationName().toString();
     }
 
-     public AMQShortString getRoutingKey()
+    public AMQShortString getRoutingKey()
     {
         return getDestinationName();
     }
@@ -93,7 +93,7 @@
      * Override since the queue is always private and we must ensure it remains null. If not,
      * reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather
      * than getting their own (private) queue.
-     *
+     * <p/>
      * This is relatively nasty but it is difficult to come up with a more elegant solution, given
      * the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to
      * use the underlying queue name even where it is server generated.

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Feb  6 01:57:35 2007
@@ -212,16 +212,15 @@
 
         //i.e. it is only valid to call this method if
         //
-        //    (a) the session is stopped, in which case the dispatcher is not running
+        //    (a) the connection is stopped, in which case the dispatcher is not running
         //    OR
         //    (b) the listener is null AND we are not receiving synchronously at present
         //
 
-        if (_session.isStopped())
+        if (!_session.getAMQConnection().started())
         {
             _messageListener.set(messageListener);
             _session.setHasMessageListeners();
-            _session.startDistpatcherIfNecessary();
 
             if (_logger.isDebugEnabled())
             {
@@ -248,7 +247,6 @@
 
                 synchronized (_session)
                 {
-                    
                     _messageListener.set(messageListener);
                     _session.setHasMessageListeners();
                     _session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@
 
     public Message receive(long l) throws JMSException
     {
-        _session.startDistpatcherIfNecessary();
 
         checkPreConditions();
 
         acquireReceiving();
 
+        _session.startDistpatcherIfNecessary();
+
         try
         {
             if (closeOnAutoClose())
@@ -385,12 +384,12 @@
 
     public Message receiveNoWait() throws JMSException
     {
-        _session.startDistpatcherIfNecessary();
-
         checkPreConditions();
 
         acquireReceiving();
 
+        _session.startDistpatcherIfNecessary();
+
         try
         {
             if (closeOnAutoClose())
@@ -560,7 +559,6 @@
             }
             else
             {
-                //This shouldn't be possible.
                 _synchronousQueue.put(jmsMessage);
             }
         }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Feb  6 01:57:35 2007
@@ -388,9 +388,7 @@
             }
             else
             {
-                //TODO; Do we really want to create an empty message here ?
-                newMessage = (AbstractJMSMessage) _session.createMessage();
-                return new MessageConverter(newMessage).getConvertedMessage();
+                newMessage = new MessageConverter(message).getConvertedMessage();
             }
 
             if (newMessage != null)

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Tue Feb  6 01:57:35 2007
@@ -269,7 +269,7 @@
                         s = String.valueOf(o);
                     }
                 }
-            }
+            }//else return s // null; 
         }
 
         return s;
@@ -508,16 +508,16 @@
 
             // JMS invalid names
             if ((propertyName.equals("NULL")
-                    || propertyName.equals("TRUE")
-                    || propertyName.equals("FALSE")
-                    || propertyName.equals("NOT")
-                    || propertyName.equals("AND")
-                    || propertyName.equals("OR")
-                    || propertyName.equals("BETWEEN")
-                    || propertyName.equals("LIKE")
-                    || propertyName.equals("IN")
-                    || propertyName.equals("IS")
-                    || propertyName.equals("ESCAPE")))
+                 || propertyName.equals("TRUE")
+                 || propertyName.equals("FALSE")
+                 || propertyName.equals("NOT")
+                 || propertyName.equals("AND")
+                 || propertyName.equals("OR")
+                 || propertyName.equals("BETWEEN")
+                 || propertyName.equals("LIKE")
+                 || propertyName.equals("IN")
+                 || propertyName.equals("IS")
+                 || propertyName.equals("ESCAPE")))
             {
                 throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
             }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Tue Feb  6 01:57:35 2007
@@ -25,7 +25,8 @@
 import javax.jms.*;
 import java.util.Enumeration;
 
-public class MessageConverter {
+public class MessageConverter
+{
 
     /**
      * Log4J logger
@@ -114,6 +115,16 @@
         {
             //we're at the end so don't mind the exception
         }
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+    }
+
+    public MessageConverter(Message message) throws JMSException
+    {
+        //Send a message with just properties.
+        // Throwing away content
+        BytesMessage nativeMessage = new JMSBytesMessage();
+
         _newMessage = (AbstractJMSMessage) nativeMessage;
         setMessageProperties(message);
     }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=504056&r1=504055&r2=504056
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Feb  6 01:57:35 2007
@@ -225,6 +225,7 @@
             }
             if(_currentState != s)
             {
+                _logger.warn("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
                 throw new AMQException("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
             }
         }