You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [24/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactory.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactory.java Fri Aug 3 12:13:32 2012
@@ -20,18 +20,27 @@
package org.apache.qpid.disttest.charting.chartbuilder;
import org.apache.qpid.disttest.charting.ChartType;
+import org.apache.qpid.disttest.charting.seriesbuilder.SeriesBuilder;
public class ChartBuilderFactory
{
- public static ChartBuilder createChartBuilder(ChartType chartType)
+ public static ChartBuilder createChartBuilder(ChartType chartType, SeriesBuilder seriesBuilder)
{
switch (chartType)
{
case LINE:
- return new LineChartBuilder();
+ return new LineChartBuilder(seriesBuilder);
+ case LINE3D:
+ return new LineChart3DBuilder(seriesBuilder);
case BAR:
- return new BarChartBuilder();
+ return new BarChartBuilder(seriesBuilder);
+ case BAR3D:
+ return new BarChart3DBuilder(seriesBuilder);
+ case XYLINE:
+ return new XYLineChartBuilder(seriesBuilder);
+ case STATISTICAL_BAR:
+ return new StatisticalBarCharBuilder(seriesBuilder);
default:
throw new IllegalArgumentException("Unknown chart type " + chartType);
}
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/LineChartBuilder.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/LineChartBuilder.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/LineChartBuilder.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/chartbuilder/LineChartBuilder.java Fri Aug 3 12:13:32 2012
@@ -19,22 +19,30 @@
*/
package org.apache.qpid.disttest.charting.chartbuilder;
+import org.apache.qpid.disttest.charting.seriesbuilder.SeriesBuilder;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PlotOrientation;
-import org.jfree.data.category.DefaultCategoryDataset;
+import org.jfree.data.category.CategoryDataset;
+import org.jfree.data.general.Dataset;
-public class LineChartBuilder extends DataSetBasedChartBuilder
+public class LineChartBuilder extends CategoryDataSetBasedChartBuilder
{
+
+ public LineChartBuilder(SeriesBuilder seriesBuilder)
+ {
+ super(seriesBuilder);
+ }
+
@Override
public JFreeChart createChartImpl(String title, String xAxisTitle,
- String yAxisTitle, final DefaultCategoryDataset dataset, PlotOrientation plotOrientation,
+ String yAxisTitle, final Dataset dataset, PlotOrientation plotOrientation,
boolean showLegend, boolean showToolTips, boolean showUrls)
{
JFreeChart chart = ChartFactory.createLineChart(title,
xAxisTitle,
yAxisTitle,
- dataset,
+ (CategoryDataset)dataset,
plotOrientation,
showLegend,
showToolTips,
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinition.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinition.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinition.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinition.java Fri Aug 3 12:13:32 2012
@@ -29,6 +29,7 @@ public class ChartingDefinition
private final String _chartStemName;
private final ChartType _chartType;
private final String _chartTitle;
+ private final String _chartSubtitle;
private final String _xaxisTitle;
private final String _yaxisTitle;
private final List<SeriesDefinition> _seriesDefinitions;
@@ -37,12 +38,13 @@ public class ChartingDefinition
public ChartingDefinition(final String chartStemName,
final ChartType chartType,
final String chartTitle,
- final String xaxisTitle,
- final String yaxisTitle, List<SeriesDefinition> seriesDefinitions)
+ final String chartSubtitle,
+ final String xaxisTitle, final String yaxisTitle, List<SeriesDefinition> seriesDefinitions)
{
_chartStemName = chartStemName;
_chartType = chartType;
_chartTitle = chartTitle;
+ _chartSubtitle = chartSubtitle;
_xaxisTitle = xaxisTitle;
_yaxisTitle = yaxisTitle;
_seriesDefinitions = seriesDefinitions;
@@ -58,6 +60,11 @@ public class ChartingDefinition
return _chartTitle;
}
+ public String getChartSubtitle()
+ {
+ return _chartSubtitle;
+ }
+
public String getXAxisTitle()
{
@@ -82,4 +89,5 @@ public class ChartingDefinition
return Collections.unmodifiableList(_seriesDefinitions);
}
+
}
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreator.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreator.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/main/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreator.java Fri Aug 3 12:13:32 2012
@@ -38,6 +38,7 @@ public class ChartingDefinitionCreator
public static final String CHART_TYPE_KEY = "chartType";
public static final String CHART_TITLE_KEY = "chartTitle";
+ public static final String CHART_SUBTITLE_KEY = "chartSubtitle";
public static final String XAXIS_TITLE_KEY = "xAxisTitle";
public static final String YAXIS_TITLE_KEY = "yAxisTitle";
@@ -80,6 +81,7 @@ public class ChartingDefinitionCreator
final ChartType chartType = ChartType.valueOf(props.getProperty(CHART_TYPE_KEY));
final String chartTitle = props.getProperty(CHART_TITLE_KEY);
+ final String chartSubtitle = props.getProperty(CHART_SUBTITLE_KEY);
final String xAxisTitle = props.getProperty(XAXIS_TITLE_KEY);
final String yAxisTitle = props.getProperty(YAXIS_TITLE_KEY);
@@ -88,9 +90,9 @@ public class ChartingDefinitionCreator
final ChartingDefinition chartDefinition = new ChartingDefinition(chartStemName,
chartType,
chartTitle,
+ chartSubtitle,
xAxisTitle,
- yAxisTitle,
- seriesDefinitions);
+ yAxisTitle, seriesDefinitions);
return chartDefinition;
}
catch (IOException e)
@@ -134,7 +136,4 @@ public class ChartingDefinitionCreator
return pathname.isFile() && pathname.getName().endsWith(CHARTDEF_FILE_EXTENSION);
}
}
-
-
-
}
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactoryTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactoryTest.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/chartbuilder/ChartBuilderFactoryTest.java Fri Aug 3 12:13:32 2012
@@ -19,22 +19,44 @@
*/
package org.apache.qpid.disttest.charting.chartbuilder;
+import static org.mockito.Mockito.*;
+
import org.apache.qpid.disttest.charting.ChartType;
+import org.apache.qpid.disttest.charting.seriesbuilder.SeriesBuilder;
import junit.framework.TestCase;
public class ChartBuilderFactoryTest extends TestCase
{
+ private SeriesBuilder _seriesBuilder = mock(SeriesBuilder.class);
+
public void testLineChart()
{
- ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.LINE);
+ ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.LINE, _seriesBuilder);
assertTrue(builder instanceof LineChartBuilder);
}
+ public void testLineChart3D()
+ {
+ ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.LINE3D, _seriesBuilder);
+ assertTrue(builder instanceof LineChart3DBuilder);
+ }
+
public void testBarChart()
{
- ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.BAR);
+ ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.BAR, _seriesBuilder);
assertTrue(builder instanceof BarChartBuilder);
}
+ public void testBarChart3D()
+ {
+ ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.BAR3D, _seriesBuilder);
+ assertTrue(builder instanceof BarChart3DBuilder);
+ }
+
+ public void testXYLineChart()
+ {
+ ChartBuilder builder = ChartBuilderFactory.createChartBuilder(ChartType.XYLINE, _seriesBuilder);
+ assertTrue(builder instanceof XYLineChartBuilder);
+ }
}
Modified: qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreatorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreatorTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreatorTest.java (original)
+++ qpid/branches/asyncstore/java/perftests/visualisation-jfc/src/test/java/org/apache/qpid/disttest/charting/definition/ChartingDefinitionCreatorTest.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,7 @@
package org.apache.qpid.disttest.charting.definition;
import static org.apache.qpid.disttest.charting.definition.ChartingDefinitionCreator.CHART_TITLE_KEY;
+import static org.apache.qpid.disttest.charting.definition.ChartingDefinitionCreator.CHART_SUBTITLE_KEY;
import static org.apache.qpid.disttest.charting.definition.ChartingDefinitionCreator.CHART_TYPE_KEY;
import static org.apache.qpid.disttest.charting.definition.ChartingDefinitionCreator.XAXIS_TITLE_KEY;
import static org.apache.qpid.disttest.charting.definition.ChartingDefinitionCreator.YAXIS_TITLE_KEY;
@@ -38,6 +39,7 @@ import org.apache.qpid.disttest.charting
public class ChartingDefinitionCreatorTest extends TestCase
{
private static final String TEST_CHART_TITLE = "CHART_TITLE";
+ private static final String TEST_CHART_SUBTITLE = "CHART_SUBTITLE";
private static final String TEST_XAXIS_TITLE = "XAXIS_TITLE";
private static final String TEST_YAXIS_TITLE = "YAXIS_TITLE";
private static final ChartType TEST_CHART_TYPE = ChartType.LINE;
@@ -83,6 +85,7 @@ public class ChartingDefinitionCreatorTe
ChartingDefinition definition1 = definitions.get(0);
assertEquals(TEST_CHART_TITLE, definition1.getChartTitle());
+ assertEquals(TEST_CHART_SUBTITLE, definition1.getChartSubtitle());
assertEquals(TEST_XAXIS_TITLE, definition1.getXAxisTitle());
assertEquals(TEST_YAXIS_TITLE, definition1.getYAxisTitle());
assertEquals(TEST_CHART_TYPE, definition1.getChartType());
@@ -121,6 +124,7 @@ public class ChartingDefinitionCreatorTe
Properties props = new Properties();
props.setProperty(CHART_TYPE_KEY, TEST_CHART_TYPE.name());
props.setProperty(CHART_TITLE_KEY, TEST_CHART_TITLE);
+ props.setProperty(CHART_SUBTITLE_KEY, TEST_CHART_SUBTITLE);
props.setProperty(XAXIS_TITLE_KEY, TEST_XAXIS_TITLE);
props.setProperty(YAXIS_TITLE_KEY, TEST_YAXIS_TITLE);
Modified: qpid/branches/asyncstore/java/systests/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/build.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/build.xml (original)
+++ qpid/branches/asyncstore/java/systests/build.xml Fri Aug 3 12:13:32 2012
@@ -19,12 +19,17 @@ nn - or more contributor license agreeme
-
-->
<project name="System Tests" default="build">
-
<condition property="systests.optional.depends" value="bdbstore" else="">
+ <or>
<and>
<contains string="${modules.opt}" substring="bdbstore"/>
<contains string="${profile}" substring="bdb"/>
</and>
+ <and>
+ <istrue value="${optional}"/>
+ <contains string="${profile}" substring="bdb"/>
+ </and>
+ </or>
</condition>
<property name="module.depends" value="client management/common broker broker/test common amqp-1-0-common common/test jca ${systests.optional.depends}"/>
Modified: qpid/branches/asyncstore/java/systests/etc/config-systests-bdb.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/config-systests-bdb.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/config-systests-bdb.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/config-systests-bdb.xml Fri Aug 3 12:13:32 2012
@@ -25,6 +25,5 @@
<xml fileName="${QPID_HOME}/${test.config}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/config-systests-bdb-settings.xml"/>
<xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/config.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/config-systests-derby.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/config-systests-derby.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/config-systests-derby.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/config-systests-derby.xml Fri Aug 3 12:13:32 2012
@@ -25,6 +25,5 @@
<xml fileName="${QPID_HOME}/${test.config}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/config-systests-derby-settings.xml"/>
<xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/config.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/config-systests-firewall.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/config-systests-firewall.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/config-systests-firewall.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/config-systests-firewall.xml Fri Aug 3 12:13:32 2012
@@ -26,6 +26,5 @@
<xml fileName="${QPID_FIREWALL_CONFIG_SETTINGS}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/config-systests-firewall-settings.xml"/>
<xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/config.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/config-systests-settings.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/config-systests-settings.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/config-systests-settings.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/config-systests-settings.xml Fri Aug 3 12:13:32 2012
@@ -20,7 +20,17 @@
-
-->
<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+
+ <plugin-directory>${QPID_HOME}/lib/plugins</plugin-directory>
+ <cache-directory>${QPID_WORK}/cache</cache-directory>
+
<connector>
+ <!-- To enable SSL edit the keystorePath and keystorePassword
+ and set enabled to true.
+ To disable Non-SSL port set sslOnly to true -->
<ssl>
<port>15671</port>
<enabled>false</enabled>
@@ -28,14 +38,66 @@
<keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keyStorePath>
<keyStorePassword>password</keyStorePassword>
</ssl>
+ <port>5672</port>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
+ <jmxport>
+ <registryServer>8999</registryServer>
+ <!--
+ If unspecified, connectorServer defaults to 100 + registryServer port.
+ <connectorServer>9099</connectionServer>
+ -->
+ </jmxport>
<ssl>
<enabled>false</enabled>
<keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keyStorePath>
<keyStorePassword>password</keyStorePassword>
</ssl>
+ <http>
+ <enabled>false</enabled>
+ </http>
</management>
+ <advanced>
+ <framesize>65535</framesize>
+ <locale>en_US</locale>
+ </advanced>
+
+ <security>
+ <pd-auth-manager>
+ <principal-database>
+ <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwd</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </pd-auth-manager>
+
+ <!-- By default, all authenticated users have permissions to perform all actions -->
+
+ <!-- ACL Example
+ This example illustrates securing the both Management (JMX) and Messaging.
+ <acl>${conf}/broker_example.acl</acl>
+ -->
+
+ <msg-auth>false</msg-auth>
+ </security>
+
<virtualhosts>${QPID_HOME}/etc/virtualhosts-systests.xml</virtualhosts>
+
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <status-updates>ON</status-updates>
+
</broker>
Modified: qpid/branches/asyncstore/java/systests/etc/config-systests.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/config-systests.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/config-systests.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/config-systests.xml Fri Aug 3 12:13:32 2012
@@ -24,6 +24,5 @@
<override>
<xml fileName="${QPID_HOME}/${test.config}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/config.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb-settings.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb-settings.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb-settings.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb-settings.xml Fri Aug 3 12:13:32 2012
@@ -26,7 +26,7 @@
<name>localhost</name>
<localhost>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/localhost-store</environment-path>
</store>
</localhost>
@@ -36,7 +36,7 @@
<name>development</name>
<development>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/development-store</environment-path>
</store>
</development>
@@ -46,7 +46,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/test-store</environment-path>
</store>
</test>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-bdb.xml Fri Aug 3 12:13:32 2012
@@ -24,6 +24,6 @@
<override>
<xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/virtualhosts-systests-bdb-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/>
+ <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-settings.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby-settings.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby-settings.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby-settings.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby-settings.xml Fri Aug 3 12:13:32 2012
@@ -26,7 +26,7 @@
<virtualhost>
<localhost>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/localhost-store</environment-path>
</store>
</localhost>
@@ -35,7 +35,7 @@
<virtualhost>
<development>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/development-store</environment-path>
</store>
</development>
@@ -44,7 +44,7 @@
<virtualhost>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/test-store</environment-path>
</store>
</test>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-derby.xml Fri Aug 3 12:13:32 2012
@@ -24,6 +24,6 @@
<override>
<xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/>
<xml fileName="${QPID_HOME}/etc/virtualhosts-systests-derby-settings.xml"/>
- <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/>
+ <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-settings.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-2.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-2.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-2.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-2.xml Fri Aug 3 12:13:32 2012
@@ -26,7 +26,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
</test>
</virtualhost>
@@ -35,7 +35,7 @@
<name>test2</name>
<test2>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
<security>
<firewall default-action="deny">
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-3.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-3.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-3.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall-3.xml Fri Aug 3 12:13:32 2012
@@ -26,7 +26,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
</test>
</virtualhost>
@@ -35,7 +35,7 @@
<name>test2</name>
<test2>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
<security>
<firewall default-action="deny"/>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests-firewall.xml Fri Aug 3 12:13:32 2012
@@ -24,6 +24,6 @@
<override>
<xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/>
<xml fileName="${QPID_FIREWALL_VIRTUALHOSTS_SETTINGS}" optional="true"/>
- <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/>
+ <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-settings.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests.xml (original)
+++ qpid/branches/asyncstore/java/systests/etc/virtualhosts-systests.xml Fri Aug 3 12:13:32 2012
@@ -23,6 +23,6 @@
<system/>
<override>
<xml fileName="${QPID_HOME}/${test.virtualhosts}" optional="true"/>
- <xml fileName="${QPID_HOME}/etc/virtualhosts.xml"/>
+ <xml fileName="${QPID_HOME}/etc/virtualhosts-systests-settings.xml"/>
</override>
</configuration>
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Aug 3 12:13:32 2012
@@ -19,7 +19,13 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -36,10 +42,14 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
+import javax.naming.NamingException;
+
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -760,6 +770,181 @@ public class FailoverBehaviourTest exten
//got started, before allowing the test to tear down
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
}
+
+ /**
+ * This test only tests 0-8/0-9/0-9-1 failover timeout
+ */
+ public void testFailoverHandlerTimeoutExpires() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+
+ // sleep interval exceeds failover timeout interval
+ Thread.sleep(11000l);
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
+ assertTrue("Failover should not succeed due to timeout", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ public void testFailoverHandlerTimeoutReconnected() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ assertFalse("Failover should restore connectivity", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ /**
+ * Tests that the producer flow control flag is reset when failover occurs while
+ * the producers are being blocked by the broker.
+ *
+ * Uses Java broker specific queue configuration to enabled PSFC.
+ */
+ public void testFlowControlFlagResetOnFailover() throws Exception
+ {
+ // we do not need the connection failing to second broker
+ _connection.close();
+
+ // make sure that failover timeout is bigger than flow control timeout
+ setTestSystemProperty("qpid.failover_method_timeout", "60000");
+ setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
+
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
+ final AtomicInteger counter = new AtomicInteger();
+ // try to send 5 messages (should block after 4)
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageProducer producer = producerSession.createProducer(queue);
+ for (int i=0; i < 5; i++)
+ {
+ Message next = createNextMessage(producerSession, i);
+ producer.send(next);
+ producerSession.commit();
+ counter.incrementAndGet();
+ }
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ }
+ }).start();
+
+ long limit= 30000l;
+ long start = System.currentTimeMillis();
+
+ // wait until session is blocked
+ while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ // Message counter could be 3 or 4 depending on the progression of producing thread relative
+ // to the receipt of the ChannelFlow.
+ final int currentCounter = counter.get();
+ assertTrue("Unexpected number of sent messages", currentCounter == 3 || currentCounter == 4);
+
+ killBroker();
+ startBroker();
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(60000l);
+
+ assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments);
+ Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+ + "'&autodelete='" + true + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+
+ private AMQConnection createConnectionWithFailover() throws NamingException, JMSException
+ {
+ AMQConnection connection;
+ AMQConnectionFactory connectionFactory = (AMQConnectionFactory)getConnectionFactory("default");
+ ConnectionURL connectionURL = connectionFactory.getConnectionURL();
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER, "singlebroker");
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE, "2");
+ BrokerDetails details = connectionURL.getBrokerDetails(0);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, "200");
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "1000");
+
+ connection = (AMQConnection)connectionFactory.createConnection("admin", "admin");
+ connection.setConnectionListener(this);
+ return connection;
+ }
+
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java Fri Aug 3 12:13:32 2012
@@ -46,6 +46,7 @@ public class SSLTest extends QpidBrokerT
setTestClientSystemProperty("profile.use_ssl", "true");
setConfigurationProperty("connector.ssl.enabled", "true");
setConfigurationProperty("connector.ssl.sslOnly", "true");
+ setConfigurationProperty("connector.ssl.wantClientAuth", "true");
}
// set the ssl system properties
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java Fri Aug 3 12:13:32 2012
@@ -24,21 +24,72 @@ import org.apache.qpid.client.AMQConnect
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.test.unit.xa.AbstractXATestCase;
+import org.apache.qpid.client.AMQXAResource;
+
+import org.apache.qpid.dtx.XidImpl;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
-public class XAResourceTest extends QpidBrokerTestCase
+public class XAResourceTest extends AbstractXATestCase
{
private static final String FACTORY_NAME = "default";
private static final String ALT_FACTORY_NAME = "connection2";
+ public void init() throws Exception
+ {
+ }
+
+ public void testIsSameRMJoin() throws Exception
+ {
+ XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
+ XAConnection conn1 = factory.createXAConnection("guest", "guest");
+ XAConnection conn2 = factory.createXAConnection("guest", "guest");
+ XAConnection conn3 = factory.createXAConnection("guest", "guest");
+
+ XASession session1 = conn1.createXASession();
+ XASession session2 = conn2.createXASession();
+ XASession session3 = conn3.createXASession();
+
+ AMQXAResource xaResource1 = (AMQXAResource)session1.getXAResource();
+ AMQXAResource xaResource2 = (AMQXAResource)session2.getXAResource();
+ AMQXAResource xaResource3 = (AMQXAResource)session3.getXAResource();
+
+ Xid xid = getNewXid();
+
+ xaResource1.start(xid, XAResource.TMNOFLAGS);
+ assertTrue("XAResource isSameRM", xaResource1.isSameRM(xaResource2));
+ xaResource2.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource1.getSiblings().size() == 1);
+
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource2.getSiblings().size() == 0);
+
+ assertTrue("XAResource isSameRM", xaResource2.isSameRM(xaResource3));
+
+
+ xaResource3.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource2.getSiblings().size() == 1);
+
+ xaResource1.end(xid, XAResource.TMSUCCESS);
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource1.getSiblings().size() == 0);
+
+ xaResource1.prepare(xid);
+ xaResource1.commit(xid, false);
+
+ conn3.close();
+ conn2.close();
+ conn1.close();
+ }
+
/*
* Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal,
- * as they originate from the same session.
+ * as they originate from the same session.
*/
public void testIsSameRMSingleCF() throws Exception
{
@@ -47,14 +98,14 @@ public class XAResourceTest extends Qpid
XASession session = conn.createXASession();
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session.getXAResource();
-
+
assertEquals("XAResource objects not equal", xaResource1, xaResource2);
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
-
+
session.close();
conn.close();
}
-
+
/*
* Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be
* equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true.
@@ -67,11 +118,11 @@ public class XAResourceTest extends Qpid
XAConnectionFactory factory = new AMQConnectionFactory(url);
XAConnectionFactory factory2 = new AMQConnectionFactory(url);
XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME);
-
+
XAConnection conn = factory.createXAConnection("guest","guest");
XAConnection conn2 = factory2.createXAConnection("guest","guest");
XAConnection conn3 = factory3.createXAConnection("guest","guest");
-
+
XASession session = conn.createXASession();
XASession session2 = conn2.createXASession();
XASession session3 = conn3.createXASession();
@@ -79,14 +130,14 @@ public class XAResourceTest extends Qpid
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session2.getXAResource();
XAResource xaResource3 = session3.getXAResource();
-
+
assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2));
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3));
-
+
conn.close();
conn2.close();
- conn3.close();
+ conn3.close();
}
@Override
@@ -103,5 +154,5 @@ public class XAResourceTest extends Qpid
FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
-
+
}
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java Fri Aug 3 12:13:32 2012
@@ -38,6 +38,13 @@ public class SupportedProtocolVersionsTe
// No-op, we call super.setUp() from test methods after appropriate config overrides
}
+ private void clearProtocolSupportManipulations()
+ {
+ //Remove the QBTC provided protocol manipulations, giving only the protocols which default to enabled
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES, null);
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES, null);
+ }
+
/**
* Test that 0-10, 0-9-1, 0-9, and 0-8 support is present when no
* attempt has yet been made to disable them, and forcing the client
@@ -46,7 +53,8 @@ public class SupportedProtocolVersionsTe
*/
public void testDefaultProtocolSupport() throws Exception
{
- //Start the broker without modifying its supported protocols
+ clearProtocolSupportManipulations();
+
super.setUp();
//Verify requesting a 0-10 connection works
@@ -74,11 +82,13 @@ public class SupportedProtocolVersionsTe
connection.close();
}
- public void testDisabling010() throws Exception
+ public void testDisabling010and10() throws Exception
{
- //disable 0-10 support
- setConfigurationProperty("connector.amqp10enabled", "false");
- setConfigurationProperty("connector.amqp010enabled", "false");
+ clearProtocolSupportManipulations();
+
+ //disable 0-10 and 1-0 support
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
super.setUp();
@@ -90,9 +100,11 @@ public class SupportedProtocolVersionsTe
connection.close();
}
- public void testDisabling091and010() throws Exception
+ public void testDisabling091and010and10() throws Exception
{
- //disable 0-91 and 0-10 support
+ clearProtocolSupportManipulations();
+
+ //disable 0-91 and 0-10 and 1-0 support
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
@@ -107,9 +119,11 @@ public class SupportedProtocolVersionsTe
connection.close();
}
- public void testDisabling09and091and010() throws Exception
+ public void testDisabling09and091and010and10() throws Exception
{
- //disable 0-9, 0-91 and 0-10 support
+ clearProtocolSupportManipulations();
+
+ //disable 0-9, 0-91, 0-10 and 1-0 support
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP09ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
@@ -127,6 +141,8 @@ public class SupportedProtocolVersionsTe
public void testConfiguringReplyingToUnsupported010ProtocolInitiationWith09insteadOf091() throws Exception
{
+ clearProtocolSupportManipulations();
+
//disable 0-10 support, and set the default unsupported protocol initiation reply to 0-9
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_9");
@@ -147,4 +163,72 @@ public class SupportedProtocolVersionsTe
assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion());
connection.close();
}
+
+ public void testProtocolInclusionThroughQBTCSystemPropertiesOverridesProtocolExclusion() throws Exception
+ {
+ testProtocolInclusionOverridesProtocolExclusion(false);
+ }
+
+ public void testProtocolInclusionThroughConfigOverridesProtocolExclusion() throws Exception
+ {
+ testProtocolInclusionOverridesProtocolExclusion(true);
+ }
+
+ private void testProtocolInclusionOverridesProtocolExclusion(boolean useConfig) throws Exception
+ {
+ clearProtocolSupportManipulations();
+
+ //selectively exclude 0-10 and 1-0 on the test port
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES,"--exclude-0-10 @PORT --exclude-1-0 @PORT");
+
+ super.setUp();
+
+ //Verify initially requesting a 0-10 connection negotiates a 0-9-1 connection
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ AMQConnection connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion());
+ connection.close();
+
+ stopBroker();
+
+ if(useConfig)
+ {
+ //selectively include 0-10 support again on the test port through config
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort()));
+ }
+ else
+ {
+ //selectively include 0-10 support again on the test port through QBTC sys props
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES,"--include-0-10 @PORT");
+ }
+
+ startBroker();
+
+ //Verify requesting a 0-10 connection now returns one
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion());
+ connection.close();
+ }
+
+ public void testProtocolInclusionOverridesProtocolDisabling() throws Exception
+ {
+ clearProtocolSupportManipulations();
+
+ //disable 0-10 and 1-0
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
+
+ //selectively include 0-10 support again on the test port
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort()));
+
+ super.setUp();
+
+ //Verify initially requesting a 0-10 connection still works
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ AMQConnection connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion());
+ connection.close();
+ }
+
}
\ No newline at end of file
Propchange: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java:r1333988-1368650
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java Fri Aug 3 12:13:32 2012
@@ -22,7 +22,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -37,82 +38,65 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class ConflationQueueTest extends QpidBrokerTestCase
{
- private static final int TIMEOUT = 1500;
-
-
- private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
-
-
+ private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class);
- protected final String VHOST = "/test";
- protected final String QUEUE = "ConflationQueue";
+ private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
+ private static final String KEY_PROPERTY = "key";
private static final int MSG_COUNT = 400;
- private Connection producerConnection;
- private MessageProducer producer;
- private Session producerSession;
- private Queue queue;
- private Connection consumerConnection;
- private Session consumerSession;
-
-
- private MessageConsumer consumer;
+ private String _queueName;
+ private Queue _queue;
+ private Connection _producerConnection;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
protected void setUp() throws Exception
{
super.setUp();
- producerConnection = getConnection();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- producerConnection.start();
-
-
- }
-
- protected void tearDown() throws Exception
- {
- producerConnection.close();
- consumerConnection.close();
- super.tearDown();
+ _queueName = getTestQueueName();
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void testConflation() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -122,40 +106,33 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
-
-
}
-
public void testConflationWithRelease() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -165,31 +142,31 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -199,39 +176,34 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
-
public void testConflationWithReleaseAfterNewPublish() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -241,35 +213,35 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumer.close();
+ _consumer.close();
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
// this causes the "old" messages to be released
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -279,40 +251,54 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ }
+
+ public void testConflatedQueueDepth() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
}
+ final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
+
+ assertEquals(10, queueDepth);
}
public void testConflationBrowser() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -322,62 +308,53 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
messages.clear();
- producer.send(nextMessage(MSG_COUNT, producerSession));
+ _producer.send(nextMessage(MSG_COUNT, _producerSession));
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
assertEquals("Unexpected number of messages received",1,messages.size());
- assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
-
-
- producer.close();
- producerSession.close();
- producerConnection.close();
-
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
}
-
public void testConflation2Browsers() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
-
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
List<Message> messages = new ArrayList<Message>();
List<Message> messages2 = new ArrayList<Message>();
- Message received = consumer.receive(1000);
+ Message received = _consumer.receive(1000);
Message received2 = consumer2.receive(1000);
while(received!=null || received2!=null)
@@ -392,7 +369,7 @@ public class ConflationQueueTest extends
}
- received = consumer.receive(1000);
+ received = _consumer.receive(1000);
received2 = consumer2.receive(1000);
}
@@ -403,33 +380,195 @@ public class ConflationQueueTest extends
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
msg = messages2.get(i);
- assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+ }
+
+ public void testParallelProductionAndConsumption() throws Exception
+ {
+ createConflationQueue(_producerSession);
+
+ // Start producing threads that send messages
+ BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
+ messageProducer1.startSendingMessages();
+ BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2");
+ messageProducer2.startSendingMessages();
+
+ Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1);
+
+ messageProducer1.join();
+ messageProducer2.join();
+ final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
+ assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
+ final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
+ assertEquals(lastSentMessages1, lastSentMessages2);
+ assertEquals("The last message sent for each key should match the last message received for that key",
+ lastSentMessages1, lastReceivedMessages);
+ assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
}
+ private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception
+ {
+ producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
+ _consumerConnection = getConnection();
+ int smallPrefetchToEncourageConflation = 1;
+ _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
- private Message nextMessage(int msg, Session producerSession) throws JMSException
+ LOGGER.info("Starting to receive");
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+
+ Message message;
+ int numberOfShutdownsReceived = 0;
+ int numberOfMessagesReceived = 0;
+ while(numberOfShutdownsReceived < 2)
+ {
+ message = _consumer.receive(10000);
+ assertNotNull(message);
+
+ if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
+ {
+ numberOfShutdownsReceived++;
+ }
+ else
+ {
+ numberOfMessagesReceived++;
+ putMessageInMap(message, messageSequenceNumbersByKey);
+ }
+ }
+
+ LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
+
+ return messageSequenceNumbersByKey;
+ }
+
+ private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
{
- Message send = producerSession.createTextMessage("Message: " + msg);
+ String keyValue = message.getStringProperty(KEY_PROPERTY);
+ Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
+ messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
+ }
- send.setStringProperty("key", String.valueOf(msg % 10));
- send.setIntProperty("msg", msg);
+ private class BackgroundMessageProducer
+ {
+ static final String SHUTDOWN = "SHUTDOWN";
- return send;
+ private final String _threadName;
+
+ private volatile Exception _exception;
+
+ private Thread _thread;
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
+
+ public BackgroundMessageProducer(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
+ {
+ final long latchTimeout = 60000;
+ boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
+ assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success);
+ LOGGER.info("Quarter of messages sent");
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ public Map<String, Integer> getMessageSequenceNumbersByKey()
+ {
+ return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
+ }
+
+ public void startSendingMessages()
+ {
+ Runnable messageSender = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Starting to send in background thread");
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer backgroundProducer = producerSession.createProducer(_queue);
+ for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
+ {
+ Message message = nextMessage(messageNumber, producerSession, 2);
+ backgroundProducer.send(message);
+
+ putMessageInMap(message, _messageSequenceNumbersByKey);
+ _quarterOfMessagesSentLatch.countDown();
+ }
+
+ Message shutdownMessage = producerSession.createMessage();
+ shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+ backgroundProducer.send(shutdownMessage);
+
+ LOGGER.info("Finished sending in background thread");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ _thread = new Thread(messageSender);
+ _thread.setName(_threadName);
+ _thread.start();
+ }
+
+ public void join() throws InterruptedException
+ {
+ final int timeoutInMillis = 120000;
+ _thread.join(timeoutInMillis);
+ assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
+ }
+ }
+
+ private void createConflationQueue(Session session) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
}
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ return nextMessage(msg, producerSession, 10);
+ }
-}
+ private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+ send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
+ return send;
+ }
+}
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java Fri Aug 3 12:13:32 2012
@@ -333,7 +333,7 @@ public class ModelTest extends QpidBroke
queueName));
assertEquals(queueName, managedQueue.getName());
- assertEquals(String.valueOf(owner), managedQueue.getOwner());
+ assertEquals(owner, managedQueue.getOwner());
assertEquals(durable, managedQueue.isDurable());
assertEquals(autoDelete, managedQueue.isAutoDelete());
}
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java Fri Aug 3 12:13:32 2012
@@ -19,7 +19,6 @@
package org.apache.qpid.server.security.acl;
import org.apache.qpid.management.common.mbeans.ServerInformation;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.test.utils.JMXTestUtils;
@@ -30,7 +29,7 @@ import java.lang.management.RuntimeMXBea
* Tests that access to the JMX interface is governed only by {@link ObjectType#METHOD}/{@link ObjectType#ALL}
* rules and AMQP rights have no effect.
*
- * Ensures that objects outside the Qpid domain ({@link ManagedObject#DOMAIN}) are not governed by the ACL model.
+ * Ensures that objects outside the Qpid domain are not governed by the ACL model.
*/
public class ExternalACLJMXTest extends AbstractACLTestCase
{
Modified: qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/asyncstore/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Fri Aug 3 12:13:32 2012
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
@@ -370,4 +369,10 @@ public class SlowMessageStore implements
return _realStore.getStoreLocation();
}
+ @Override
+ public String getStoreType()
+ {
+ return "SLOW";
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org