You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/05 15:43:17 UTC
svn commit: r503703 [1/2] - in
/incubator/qpid/branches/perftesting/qpid/java: ./ broker/etc/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/q...
Author: ritchiem
Date: Mon Feb 5 06:43:14 2007
New Revision: 503703
URL: http://svn.apache.org/viewvc?view=rev&rev=503703
Log:
Revision: 503646
Author: rgreig
Date: 11:28:57, 05 February 2007
Message:
(Submitted by Rupert Smith)
This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo
Revision: 503637
Author: rgreig
Date: 11:17:08, 05 February 2007
Message:
(Submitted by Rupert Smith)
Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml
Revision: 503609
Author: ritchiem
Date: 09:49:59, 05 February 2007
Message:
Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Revision: 503604
Author: rgreig
Date: 09:40:04, 05 February 2007
Message:
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
----
Modified : /incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Revision: 503593
Author: ritchiem
Date: 08:58:30, 05 February 2007
Message:
Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Revision: 502655
Author: rgreig
Date: 16:59:14, 02 February 2007
Message:
(Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used!
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Revision: 502627
Author: rgreig
Date: 15:31:30, 02 February 2007
Message:
(Submitted by Rupert Smith)
Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Revision: 502620
Author: rgreig
Date: 15:09:08, 02 February 2007
Message:
(Submitted by Rupert Smith)
Perftests improved with better timeout handling. Shared/unique destinations to ping now an option.
TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
Revision: 502610
Author: bhupendrab
Date: 14:26:32, 02 February 2007
Message:
QPID-84
tests for FSContextFactory deleted.fscontext.jar is not part of apache svn.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
Deleted : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest
Revision: 502576
Author: ritchiem
Date: 11:13:13, 02 February 2007
Message:
QPID-343 Performance test suite doesn't output missing message count on failure.
Updated PingAsyncTestPerf to output missing messsage count.
Updated PingPongProducer so it doesn't use AMQShortStringx.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Added:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (with props)
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java (with props)
Removed:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
incubator/qpid/branches/perftesting/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/
incubator/qpid/branches/perftesting/qpid/java/mvn-repo/
incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
Modified:
incubator/qpid/branches/perftesting/qpid/java/AppliedPatches.txt
incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
incubator/qpid/branches/perftesting/qpid/java/perftests/pom.xml
incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
Modified: incubator/qpid/branches/perftesting/qpid/java/AppliedPatches.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/AppliedPatches.txt?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/AppliedPatches.txt (original)
+++ incubator/qpid/branches/perftesting/qpid/java/AppliedPatches.txt Mon Feb 5 06:43:14 2007
@@ -3,7 +3,7 @@
- Hard coded to return 0
-Latest Revision-
-502576.499979
+503646,502576.499979
-Not Done-
502610 QPID-84 UPdate PropertiesFileInitialContextFactoryTest Remove referencetest
@@ -18,7 +18,6 @@
501008 QPID-324 change sending of destination to byte representing T/Q/Other
501007 QPID-322 Test may habg isntead of fail of messages doesn't get through
501005 QPID-324 change sending of destination to byte representing T/Q/Other
-501003 QPID-320 Imporve performance by remembering protocol version
500310 Patch to TransactedTest to clean up messages
500264 Change to Transacted Test
499874 QPID-319 management console update (QPID-50)
@@ -64,8 +63,19 @@
-PARTIAL-
494650 QPID-268 Improvements to performance of generated code
+501003 QPID-320 Imporve performance by remembering protocol version
+ * DeliveryManager Queue data size counting.
-Done-
+503646 local repository removed
+503637 Junit-toolkit update
+503609 Updated to performacen test to allow the use of shared destinations.
+503604 QPID-326 add oldest Message on queue Notification and log notification in log file
+503593 Fixed bug in session . stop() where it would NPE on close if not start()ed
+502655 Moved option sets to top of constructor
+502627 Fixed problem with losing message results.
+502620 Improved timeout handling
+502576 QPID-343 Performance test suite doesn't output missing message count on failure
502627 Fixed problem with losing message results
502620 Performance test improvement with better timeout handing
502576 QPID-343 performacne test suite doesn't output missing message count
@@ -89,7 +99,6 @@
501010 QPID-322 Message reference count not incremented when added to unackMap.
501004 QPID-320 Simplify MessageListener setup
500284 Updated script details and added guard for traffic light being null
--
499979 QPID-315 Test classes to reproduce missing correlation id
499975 QPID-315 Moved message converstion to MessageConverter.
499781 Fixed race that caused duplicate date in log file
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml Mon Feb 5 06:43:14 2007
@@ -42,6 +42,10 @@
<priority value="info"/>
</category>
+ <category name="org.apache.qpid.server.queue.AMQQueueMBean">
+ <priority value="info"/>
+ </category>
+
<category name="org.apache.qpid">
<priority value="warn"/>
</category>
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml Mon Feb 5 06:43:14 2007
@@ -21,8 +21,79 @@
-->
<virtualhosts>
<virtualhost>
- <path>/development</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>localhost</name>
+
+ <localhost>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </localhost>
+ </virtualhost>
+ <virtualhost>
+ <name>development</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <development>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <test>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Mon Feb 5 06:43:14 2007
@@ -32,9 +32,13 @@
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
public class VirtualHostConfiguration
{
@@ -42,11 +46,7 @@
XMLConfiguration _config;
- private static final String XML_VIRTUALHOST = "virtualhost";
- private static final String XML_PATH = "path";
- private static final String XML_BIND = "bind";
- private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
- private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+ private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
public VirtualHostConfiguration(String configFile) throws ConfigurationException
@@ -55,135 +55,58 @@
_config = new XMLConfiguration(configFile);
- if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
- {
- throw new ConfigurationException(
- "Virtualhost Configuration document does not contain a valid virtualhost.");
- }
}
- public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
- {
- Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
- if (prop instanceof Collection)
- {
- _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
- int virtualhosts = ((Collection) prop).size();
- for (int vhost = 0; vhost < virtualhosts; vhost++)
- {
- loadVirtualHost(vhost);
- }
- }
- else
- {
- loadVirtualHost(-1);
- }
- }
-
- private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
{
- String path = XML_VIRTUALHOST;
+ _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
- if (index != -1)
+ if(virtualHostName == null)
{
- path = path + "(" + index + ")";
- }
-
- Object prop = _config.getProperty(path + "." + XML_PATH);
-
- if (prop == null)
- {
- prop = _config.getProperty(path + "." + XML_BIND);
- String error = "Virtual Host not defined for binding";
-
- if (prop != null)
- {
- if (prop instanceof Collection)
- {
- error += "s";
- }
-
- error += ": " + prop;
- }
-
- throw new ConfigurationException(error);
+ throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- _logger.info("VirtualHost:'" + prop + "'");
+ List queueNames = configuration.getList("queue.name");
- prop = _config.getProperty(path + "." + XML_BIND);
- if (prop instanceof Collection)
+ for(Object queueNameObj : queueNames)
{
- int bindings = ((Collection) prop).size();
- _logger.debug("Number of Bindings: " + bindings);
- for (int dest = 0; dest < bindings; dest++)
- {
- loadBinding(path, dest);
- }
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(queueName, configuration);
}
- else
- {
- loadBinding(path, -1);
- }
- }
- private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
- {
- String path = rootpath + "." + XML_BIND;
- if (index != -1)
- {
- path = path + "(" + index + ")";
- }
-
- String bindingString = _config.getString(path);
-
- AMQBindingURL binding = new AMQBindingURL(bindingString);
-
- _logger.debug("Loaded Binding:" + binding);
-
- try
- {
- bind(binding);
- }
- catch (AMQException amqe)
- {
- _logger.info("Unable to bind url: " + binding);
- throw amqe;
- }
}
- private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void configureQueue(String queueName, Configuration configuration) throws AMQException, ConfigurationException
{
+ CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- String queueName = binding.getQueueName();
-
- // This will occur if the URL is a Topic
- if (queueName == null)
- {
- //todo register valid topic
- ///queueName = binding.getDestinationName();
- throw new AMQException("Topics cannot be bound. TODO Register valid topic");
- }
+ queueConfiguration.addConfiguration(configuration.subset("queue."+ queueName));
+ queueConfiguration.addConfiguration(configuration);
- //Get references to Broker Registries
QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+ AMQQueue queue;
+
synchronized (queueRegistry)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+ _logger.info("Creating queue '" + queueName + "' [on virtual host ]" );
+
+ boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
+ String owner = queueConfiguration.getString("owner", null);
queue = new AMQQueue(queueName,
- Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
- null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, queueRegistry);
+ durable,
+ owner == null ? null : owner /* These queues will have no owner */,
+ autodelete /* Therefore autodelete makes no sence */, queueRegistry);
if (queue.isDurable())
{
@@ -194,27 +117,67 @@
}
else
{
- _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+ _logger.info("Queue '" + queueName + "' already exists [on virtual host ], not creating.");
}
- Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
- synchronized (defaultExchange)
+ String exchangeName = queueConfiguration.getString("exchange", null);
+
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : exchangeName);
+
+ if(exchange == null)
{
- if (defaultExchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
- }
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
- defaultExchange.registerQueue(queue.getName(), queue, null);
+ if (exchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ }
- if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+ synchronized (exchange)
+ {
+ List routingKeys = queueConfiguration.getList("routingKey");
+ if(routingKeys == null || routingKeys.isEmpty())
{
- throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+ routingKeys = Collections.singletonList(queue.getName());
}
- queue.bind(binding.getRoutingKey(), defaultExchange);
+ for(Object routingKey : routingKeys)
+ {
+ exchange.registerQueue((String)routingKey, queue, null);
+
+ queue.bind((String)routingKey, exchange);
+
+ _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+ }
}
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
}
+
+
+ Configurator.configure(queue);//, queueConfiguration);
}
+
+
+ public void performBindings() throws AMQException, ConfigurationException
+ {
+ List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
+
+ _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+ for(Object nameObject : virtualHostNames)
+ {
+ String name = String.valueOf(nameObject);
+ configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+ }
+
+ if (virtualHostNames == null || virtualHostNames.isEmpty())
+ {
+ throw new ConfigurationException(
+ "Virtualhost Configuration document does not contain a valid virtualhost.");
+ }
+ }
+
+
+
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Mon Feb 5 06:43:14 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,6 +37,8 @@
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -52,9 +54,23 @@
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
+ public Exchange getDefaultExchange()
+ {
+ return _defaultExchange;
+ }
+
public void unregisterExchange(String name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
@@ -71,7 +87,16 @@
public Exchange getExchange(String name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -82,14 +107,15 @@
public void routeContent(AMQMessage payload) throws AMQException
{
final String exchange = payload.getPublishBody().exchange;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
- // the JmsPublish being received (where the exchange is validated) and the final
+ // the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
+ // TODO: check where the exchange is validated
if (exch == null)
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
}
exch.route(payload);
}
-}
+}
\ No newline at end of file
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Mon Feb 5 06:43:14 2007
@@ -37,4 +37,6 @@
void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException;
Exchange getExchange(String name);
+
+ Exchange getDefaultExchange();
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Feb 5 06:43:14 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +44,8 @@
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
public static final String JMS_MESSAGE = "jms.message";
private final Set<Object> _tokens = new HashSet<Object>();
@@ -61,6 +64,8 @@
private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private long _arrivalTime;
+
/**
* Keeps a track of how many bytes we have received in body frames
*/
@@ -157,20 +162,20 @@
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
-
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
@@ -201,6 +206,8 @@
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
+
if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
@@ -223,6 +230,7 @@
_bodyLengthReceived += contentBody.getSize();
if (_storeWhenComplete && isAllContentReceived())
{
+ _arrivalTime = System.currentTimeMillis();
storeMessage();
}
}
@@ -263,6 +271,12 @@
_redelivered = redelivered;
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
public long getMessageId()
{
return _messageId;
@@ -299,6 +313,7 @@
throw new MessageCleanupException(_messageId, e);
}
}
+
}
public void setPublisher(AMQProtocolSession publisher)
@@ -367,11 +382,17 @@
return _txnBuffer;
}
+ public long getSize()
+ {
+ return getContentHeaderBody().bodySize;
+ }
+
/**
* Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException
{
@@ -393,7 +414,8 @@
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Feb 5 06:43:14 2007
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
@@ -90,22 +91,36 @@
/**
* max allowed size(KB) of a single message
*/
- private long _maxMessageSize = 10000;
+ private long _maximumMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxMessageCount = 10000;
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public int _maximumMessageCount;
/**
- * max queue depth(KB) for the queue
+ * max queue depth for the queue
*/
- private long _maxQueueDepth = 10000000;
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth = 10000000;
+
+ /*
+ * maximum message age before alerts occur
+ */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge = 30000; //0
+
+ /*
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap = 30000;
/**
* total messages received by the queue since startup.
*/
- private long _totalMessagesReceived = 0;
+ public long _totalMessagesReceived = 0;
public int compareTo(Object o)
{
@@ -183,35 +198,13 @@
_autoDelete = autoDelete;
_queueRegistry = queueRegistry;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -267,6 +260,11 @@
return _deliveryMgr.getMessages();
}
+ public long getQueueDepth()
+ {
+ return _deliveryMgr.getTotalMessageSize();
+ }
+
/**
* @param messageId
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -295,50 +293,55 @@
return _managedObject;
}
- public Long getMaximumMessageSize()
+ public long getMaximumMessageSize()
{
- return _maxMessageSize;
+ return _maximumMessageSize;
}
- public void setMaximumMessageSize(Long value)
+ public void setMaximumMessageSize(long value)
{
- _maxMessageSize = value;
+ _maximumMessageSize = value;
}
- public Integer getConsumerCount()
+ public int getConsumerCount()
{
return _subscribers.size();
}
- public Integer getActiveConsumerCount()
+ public int getActiveConsumerCount()
{
return _subscribers.getWeight();
}
- public Long getReceivedMessageCount()
+ public long getReceivedMessageCount()
{
return _totalMessagesReceived;
}
- public Integer getMaximumMessageCount()
+ public int getMaximumMessageCount()
{
- return _maxMessageCount;
+ return _maximumMessageCount;
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(int value)
{
- _maxMessageCount = value;
+ _maximumMessageCount = value;
}
- public Long getMaximumQueueDepth()
+ public long getMaximumQueueDepth()
{
- return _maxQueueDepth;
+ return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
+ public void setMaximumQueueDepth(long value)
{
- _maxQueueDepth = value;
+ _maximumQueueDepth = value;
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return _deliveryMgr.getOldestMessageArrival();
}
/**
@@ -374,11 +377,11 @@
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -551,6 +554,27 @@
}
}
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ }
+
+
private class Deliver implements TxnOp
{
private final AMQMessage _msg;
@@ -590,5 +614,6 @@
{
}
}
+
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Feb 5 06:43:14 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import javax.management.openmbean.*;
import javax.management.JMException;
@@ -41,8 +42,11 @@
* for an AMQQueue.
*/
@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
+ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -51,12 +55,14 @@
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
+
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
+
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -71,7 +77,7 @@
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
// It should never occur
System.out.println(ex.getMessage());
@@ -88,7 +94,7 @@
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
@@ -215,35 +221,31 @@
*/
public void checkForNotification(AMQMessage msg)
{
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _queue.getMaximumMessageSize())
+ for (NotificationCheck check : NotificationCheck.values())
{
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _queue.getMaximumQueueDepth())
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime)
+ {
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
}
}
/**
* Sends the notification to the listeners
*/
- private void notifyClients(String notificationMsg)
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
+ // important : add log to the log file - monitoring tools may be looking for this
+ _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Feb 5 06:43:14 2007
@@ -34,6 +34,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -76,7 +77,7 @@
* Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
-
+ private AtomicLong _totalMessageSize = new AtomicLong();
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,6 +110,8 @@
_messages.offer(msg);
+ _totalMessageSize.addAndGet(msg.getSize());
+
return true;
}
@@ -142,6 +145,17 @@
return _messages.size();
}
+ public long getTotalMessageSize()
+ {
+ return _totalMessageSize.get();
+ }
+
+ public long getOldestMessageArrival()
+ {
+ AMQMessage msg = _messages.peek();
+ return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ }
+
public synchronized List<AMQMessage> getMessages()
{
@@ -173,6 +187,7 @@
if (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.addAndGet(-msg.getSize());
}
}
@@ -182,6 +197,7 @@
while (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.set(0L);
msg = poll();
}
}
@@ -222,6 +238,7 @@
//remove sent message from our queue.
messageQueue.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
}
catch (FailedDequeueException e)
{
@@ -308,7 +325,7 @@
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +347,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +362,7 @@
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Mon Feb 5 06:43:14 2007
@@ -64,7 +64,8 @@
*
* @param name the name of the entity on whose behalf we are delivering the message
* @param msg the message to deliver
- * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
+ * @throws org.apache.qpid.server.queue.FailedDequeueException
+ * if the message could not be dequeued
*/
void deliver(String name, AMQMessage msg) throws FailedDequeueException;
@@ -75,4 +76,8 @@
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
+
+ long getTotalMessageSize();
+
+ long getOldestMessageArrival();
}
Added: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?view=auto&rev=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Mon Feb 5 06:43:14 2007
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount = queue.getMessageCount();
+ final int maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+// try
+// {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+// }
+// catch (AMQException e)
+// {
+// messageSize = 0;
+// }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java?view=auto&rev=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Mon Feb 5 06:43:14 2007
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java Mon Feb 5 06:43:14 2007
@@ -197,7 +197,7 @@
try
{
- _logger.error("Send additional messages");
+ _logger.info("Send additional messages");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/pom.xml?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/pom.xml Mon Feb 5 06:43:14 2007
@@ -44,7 +44,7 @@
<repository>
<id>junit-toolkit.snapshots</id>
<name>JUnit Toolkit SNAPSHOT Repository</name>
- <url>file://${basedir}/../mvn-repo</url>
+ <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -56,7 +56,7 @@
<pluginRepository>
<id>junit-toolkit-plugin.snapshots</id>
<name>JUnit Toolkit SNAPSHOT Repository</name>
- <url>file://${basedir}/../mvn-repo</url>
+ <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -148,19 +148,7 @@
<name>amqj.test.logging.level</name>
<value>info</value>
</property>
- <property>
- <name>logdir</name>
- <value>$QPID_WORK/results</value>
- </property>
- <property>
- <name>-Xms</name>
- <value>256m</value>
- </property>
- <property>
- <name>-Xmx</name>
- <value>512m</value>
- </property>
- </systemproperties>
+ </systemproperties>
<commands>
<!-- Single pings. These can be scaled up by overriding the parameters when calling the test script. -->
@@ -200,147 +188,19 @@
-n Ping-Failover-After-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterCommit=true
</Ping-Failover-After-Commit>
-
- <!-- P2P Volume Tests. -->
- <VT-Qpid-1>-n VT-Qpid-1 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</VT-Qpid-1>
- <VT-Qpid-2>-n VT-Qpid-2 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</VT-Qpid-2>
- <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. -->
- <VT-Qpid-3>-n VT-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000 transacted=true</VT-Qpid-3>
- <VT-Qpid-4>-n VT-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000</VT-Qpid-4>
-
- <!-- P2P Scalability Tests. -->
- <!-- 250,000 Total, 1P-1T-1C -->
- <PT-Qpid-1>-n PT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-1>
- <PT-Qpid-2>-n PT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-2>
-
- <!-- 25000 Msgs * 10 Brokers = 250,000 Total, 10P-1Q-10C -->
- <PT-Qpid-3>-n PT-Qpid-3 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-3>
- <PT-Qpid-4>-n PT-Qpid-4 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-4>
-
- <!-- 25000 Msgs * 10 Brokers = 250,000 Tota,l 10P-10T-10C 10*(1P-1Q-1C) -->
- <PT-Qpid-5>-n PT-Qpid-5 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1 transacted=true</PT-Qpid-5>
- <PT-Qpid-6>-n PT-Qpid-6 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1</PT-Qpid-6>
-
- <!-- 2500 Msgs * 10 Brokers * 10 Topics/Clients = 250,000 Total, 10P-100T-10C 10*(1P-10T-1C) -->
- <PT-Qpid-7>-n PT-Qpid-7 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10 transacted=true</PT-Qpid-7>
- <PT-Qpid-8>-n PT-Qpid-8 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10</PT-Qpid-8>
-
- <!-- 2500 Msgs * 100 Brokers = 250,000 Total, 100P-100T-100C 100*(1P-1T-1C) -->
- <PT-Qpid-9>-n PT-Qpid-9 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=500</PT-Qpid-9>
- <PT-Qpid-10>-n PT-Qpid-10 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-10>
-
- <!-- 250 Msgs * 100 Brokers * 10 Clients = 250,000 Total, 100P-1000T-100C 100*(1P-10T-1C) -->
- <PT-Qpid-11>-n PT-Qpid-11 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10 transacted=true CommitBatchSize=50</PT-Qpid-11>
- <PT-Qpid-12>-n PT-Qpid-12 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10</PT-Qpid-12>
-
- <!-- 250 Msgs * 1000 Brokers = 250,000 Total, 1000P-1000T-1000C 1000*(1P-1T-1C) -->
- <PT-Qpid-13>-n PT-Qpid-13 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=50</PT-Qpid-13>
- <PT-Qpid-14>-n PT-Qpid-14 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-14>
-
- <!-- P2P Volume Tests. -->
- <VQ-Qpid-1>-n VQ-Qpid-1 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</VQ-Qpid-1>
- <VQ-Qpid-2>-n VQ-Qpid-2 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</VQ-Qpid-2>
- <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. -->
- <VQ-Qpid-3>-n VQ-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 transacted=true</VQ-Qpid-3>
- <VQ-Qpid-4>-n VQ-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 </VQ-Qpid-4>
-
- <!-- P2P Scalability Tests. -->
- <!-- 15,000 Total, 1P-1Q-1C -->
- <PQ-Qpid-1>-n PQ-Qpid-1 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true</PQ-Qpid-1>
- <PQ-Qpid-2>-n PQ-Qpid-2 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf</PQ-Qpid-2>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-1Q-10C -->
- <PQ-Qpid-3>-n PQ-Qpid-3 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping transacted=true CommitBatchSize=500</PQ-Qpid-3>
- <PQ-Qpid-4>-n PQ-Qpid-4 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping</PQ-Qpid-4>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-10Q-10C 10*(1P-1Q-1C) -->
- <PQ-Qpid-5>-n PQ-Qpid-5 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1 transacted=true CommitBatchSize=500</PQ-Qpid-5>
- <PQ-Qpid-6>-n PQ-Qpid-6 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1</PQ-Qpid-6>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-100Q-10C 10*(1P-10Q-1C) -->
- <PQ-Qpid-7>-n PQ-Qpid-7 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=500</PQ-Qpid-7>
- <PQ-Qpid-8>-n PQ-Qpid-8 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-8>
-
- <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-100Q-100C 100*(1P-1Q-1C) -->
- <PQ-Qpid-9>-n PQ-Qpid-9 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 transacted=true CommitBatchSize=50</PQ-Qpid-9>
- <PQ-Qpid-10>-n PQ-Qpid-10 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 </PQ-Qpid-10>
-
- <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-1000Q-100C 100*(1P-10Q-1C) -->
- <PQ-Qpid-11>-n PQ-Qpid-11 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=50</PQ-Qpid-11>
- <PQ-Qpid-12>-n PQ-Qpid-12 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-12>
-
- <!-- 15 Messages * 1000 Brokers = 15,000 Total, 1000P-1000Q-1000C 1000*(1P-1Q-1C) -->
- <PQ-Qpid-13>-n PQ-Qpid-13 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 transacted=true CommitBatchSize=15</PQ-Qpid-13>
- <PQ-Qpid-14>-n PQ-Qpid-14 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 </PQ-Qpid-14>
-
- <!-- Increasing Message Payload Tests. -->
- <!-- Topic Testing -->
- <LT-Qpid-1-512b>-n LT-Qpid-1-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-1-512b>
- <LT-Qpid-2-512b>-n LT-Qpid-2-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512</LT-Qpid-2-512b>
-
- <LT-Qpid-1-1K>-n LT-Qpid-1-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</LT-Qpid-1-1K>
- <LT-Qpid-2-1K>-n LT-Qpid-2-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</LT-Qpid-2-1K>
-
- <LT-Qpid-1-5K>-n LT-Qpid-1-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-1-5K>
- <LT-Qpid-2-5K>-n LT-Qpid-2-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120</LT-Qpid-2-5K>
-
- <LT-Qpid-1-10K>-n LT-Qpid-1-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-1-10K>
- <LT-Qpid-2-10K>-n LT-Qpid-2-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 </LT-Qpid-2-10K>
-
- <LT-Qpid-1-50K>-n LT-Qpid-1-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-1-50K>
- <LT-Qpid-2-50K>-n LT-Qpid-2-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200</LT-Qpid-2-50K>
-
- <LT-Qpid-1-100K>-n LT-Qpid-1-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-1-100K>
- <LT-Qpid-2-100K>-n LT-Qpid-2-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400</LT-Qpid-2-100K>
-
- <LT-Qpid-1-1M>-n LT-Qpid-1-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-1-1M>
- <LT-Qpid-2-1M>-n LT-Qpid-2-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048476</LT-Qpid-2-1M>
-
- <!-- Queue Testing -->
- <LT-Qpid-3-512b>-n LT-Qpid-3-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-3-512b>
- <LT-Qpid-4-512b>-n LT-Qpid-4-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512</LT-Qpid-4-512b>
-
- <LT-Qpid-3-1K>-n LT-Qpid-3-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</LT-Qpid-3-1K>
- <LT-Qpid-4-1K>-n LT-Qpid-4-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</LT-Qpid-4-1K>
-
- <LT-Qpid-3-5K>-n LT-Qpid-3-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-3-5K>
- <LT-Qpid-4-5K>-n LT-Qpid-4-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120</LT-Qpid-4-5K>
-
- <LT-Qpid-3-10K>-n LT-Qpid-3-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-3-10K>
- <LT-Qpid-4-10K>-n LT-Qpid-4-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240</LT-Qpid-4-10K>
-
- <LT-Qpid-3-50K>-n LT-Qpid-3-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-3-50K>
- <LT-Qpid-4-50K>-n LT-Qpid-4-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200</LT-Qpid-4-50K>
-
- <LT-Qpid-3-100K>-n LT-Qpid-3-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-3-100K>
- <LT-Qpid-4-100K>-n LT-Qpid-4-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400</LT-Qpid-4-100K>
-
- <LT-Qpid-3-1M>-n LT-Qpid-3-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-3-1M>
- <LT-Qpid-4-1M>-n LT-Qpid-4-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 </LT-Qpid-4-1M>
-
- <!-- Failover Tests. -->
- <!-- Transactional -->
- <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailBeforeSend=true</FT-Qpid-1>
- <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterSend=true</FT-Qpid-2>
- <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterCommit=true</FT-Qpid-3>
-
- <!-- Non Transactional -->
- <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailBeforeSend=true</FT-Qpid-4>
- <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailAfterSend=true</FT-Qpid-5>
-
-
- </commands>
- </configuration>
-
- <executions>
- <execution>
- <phase>test</phase>
- <!--<goals>
- <goal>tktest</goal>
- </goals>-->
- </execution>
- </executions>
- </plugin>
+ </commands>
+ </configuration>
+
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <!--<goals>
+ <goal>tktest</goal>
+ </goals>-->
+ </execution>
+ </executions>
+ </plugin>
+
<!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests.
Usefull when bundling system, integration or performance tests into a convenient
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Mon Feb 5 06:43:14 2007
@@ -38,6 +38,8 @@
*/
public class PingClient extends PingPongProducer
{
+ private static int _pingClientCount;
+
/**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates ping pong producer but de-registers its reply-to destination message
@@ -76,6 +78,8 @@
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
pubsub, unique);
+
+ _pingClientCount++;
}
/**
@@ -88,4 +92,17 @@
{
return _pingDestinations;
}
+
+ public int getConsumersPerTopic()
+ {
+ if (_isUnique)
+ {
+ return 1;
+ }
+ else
+ {
+ return _pingClientCount;
+ }
+ }
+
}