You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC
svn commit: r1299257 [23/26] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/
broker-plugins/access-control/src/main/java/org/apache/qpid/serve...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.management.jmx;
-import javax.jms.Connection;
-
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import javax.jms.Connection;
+
/**
* Test generation of message statistics.
*/
@@ -162,29 +162,7 @@ public class MessageStatisticsTest exten
assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived());
assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived());
}
-
- /**
- * Test message peak rate generation.
- */
- public void testMessagePeakRates() throws Exception
- {
- sendUsing(_test, 2, 10);
- Thread.sleep(10000);
- sendUsing(_dev, 4, 10);
- Thread.sleep(10000);
-
- ManagedBroker test = _jmxUtils.getManagedBroker("test");
- ManagedBroker dev = _jmxUtils.getManagedBroker("development");
-
- assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate());
- assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate());
- assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate());
- assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate());
- assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate());
- assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate());
- }
-
/**
* Test message totals when a vhost has its statistics reset
*/
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,6 @@
*/
package org.apache.qpid.management.jmx;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
@@ -37,6 +30,13 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
/**
* Test generation of message statistics.
*/
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java Sat Mar 10 19:22:10 2012
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.server.logging.AbstractTestLogging;
-import org.apache.qpid.util.LogMonitor;
-import org.apache.log4j.Logger;
+import junit.framework.AssertionFailedError;
import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
-import java.util.List;
-
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.util.LogMonitor;
import javax.jms.Connection;
-import javax.jms.Session;
import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.List;
/**
* Series of tests to validate the external Java broker starts up as expected.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
@@ -32,7 +33,7 @@ import org.apache.qpid.test.utils.QpidBr
*/
public class ServerConfigurationFileTest extends QpidBrokerTestCase
{
- ServerConfiguration _serverConfig;
+ private ServerConfiguration _serverConfig;
public void setUp() throws ConfigurationException
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Sat Mar 10 19:22:10 2012
@@ -21,20 +21,8 @@
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQQueue;
@@ -47,6 +35,18 @@ import org.apache.qpid.test.utils.QpidBr
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class ReturnUnroutableMandatoryMessageTest extends QpidBrokerTestCase implements ExceptionListener
{
private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
@@ -241,7 +241,7 @@ public class ReturnUnroutableMandatoryMe
con2.start();
MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
- MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic, false, true);
// First test - should be routed
_logger.info("Sending non-mandatory message");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java Sat Mar 10 19:22:10 2012
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.server.failover;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
@@ -32,15 +29,17 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
private CountDownLatch _failoverComplete = new CountDownLatch(1);
- protected static final Logger _logger = LoggerFactory.getLogger(FailoverMethodTest.class);
-
-
+ private final int _freePortWithNoBroker = findFreePort();
/**
* Test that the round robin method has the correct delays.
@@ -53,8 +52,8 @@ public class FailoverMethodTest extends
//note: The first broker has no connect delay and the default 1 retry
// while the tcp:localhost broker has 3 retries with a 2s connect delay
String connectionString = "amqp://guest:guest@/test?brokerlist=" +
- "'tcp://:" + getPort() +
- ";tcp://localhost:5670?connectdelay='2000',retries='3''";
+ "'tcp://localhost:" + getPort() +
+ ";tcp://localhost:" + _freePortWithNoBroker + "?connectdelay='2000',retries='3''";
AMQConnectionURL url = new AMQConnectionURL(connectionString);
@@ -65,7 +64,9 @@ public class FailoverMethodTest extends
connection.setExceptionListener(this);
+ LOGGER.debug("Stopping broker");
stopBroker();
+ LOGGER.debug("Stopped broker");
_failoverComplete.await(30, TimeUnit.SECONDS);
assertEquals("failoverLatch was not decremented in given timeframe",
@@ -109,7 +110,9 @@ public class FailoverMethodTest extends
connection.setExceptionListener(this);
+ LOGGER.debug("Stopping broker");
stopBroker();
+ LOGGER.debug("Stopped broker");
_failoverComplete.await(30, TimeUnit.SECONDS);
assertEquals("failoverLatch was not decremented in given timeframe",
@@ -138,18 +141,6 @@ public class FailoverMethodTest extends
}
}
- public void onException(JMSException e)
- {
- if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
- {
- _logger.debug("Received AMQDisconnectedException");
- _failoverComplete.countDown();
- }
- else
- {
- _logger.error("Unexpected underlying exception", e.getLinkedException());
- }
- }
/**
* Test that setting 'nofailover' as the failover policy does not result in
@@ -200,13 +191,11 @@ public class FailoverMethodTest extends
}
catch (Exception e)
{
- System.err.println(e.getMessage());
- e.printStackTrace();
+ LOGGER.error("Exception whilst starting broker", e);
}
}
});
-
brokerStart.start();
long start = System.currentTimeMillis();
@@ -260,11 +249,17 @@ public class FailoverMethodTest extends
}
}
- public void stopBroker(int port) throws Exception
+ @Override
+ public void onException(JMSException e)
{
- if (isBrokerPresent(port))
+ if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
+ {
+ LOGGER.debug("Received AMQDisconnectedException");
+ _failoverComplete.countDown();
+ }
+ else
{
- super.stopBroker(port);
+ LOGGER.error("Unexpected underlying exception", e.getLinkedException());
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Sat Mar 10 19:22:10 2012
@@ -22,14 +22,15 @@
package org.apache.qpid.server.failure;
import junit.framework.TestCase;
-import org.apache.qpid.test.utils.QpidClientConnectionHelper;
-import org.apache.qpid.client.failover.FailoverException;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidClientConnectionHelper;
-import javax.jms.JMSException;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import java.io.IOException;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java Sat Mar 10 19:22:10 2012
@@ -20,14 +20,8 @@
*/
package org.apache.qpid.server.logging;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -35,6 +29,13 @@ import org.apache.qpid.server.util.Inter
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.LogMonitor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* Abstract superclass for logging test set up and utility methods.
*
@@ -46,7 +47,7 @@ public class AbstractTestLogging extends
public static final String TEST_LOG_PREFIX = "MESSAGE";
protected LogMonitor _monitor;
- InternalBrokerBaseCase _configLoader;
+ private InternalBrokerBaseCase _configLoader;
@Override
public void setUp() throws Exception
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -18,18 +18,17 @@
*/
package org.apache.qpid.server.logging;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.security.acl.AbstractACLTestCase;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.List;
+
/**
* ACL version 2/3 file testing to verify that ACL actor logging works correctly.
*
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java Sat Mar 10 19:22:10 2012
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.logging;
-import javax.jms.Connection;
-import javax.jms.Queue;
-import javax.jms.Session;
-
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
@@ -31,6 +27,10 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.util.FileUtils;
+import javax.jms.Connection;
+import javax.jms.Queue;
+import javax.jms.Session;
+
public class AlertingTest extends AbstractTestLogging
{
private String VIRTUALHOST = "test";
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -45,10 +45,10 @@ public class BindingLoggingTest extends
static final String BND_PREFIX = "BND-";
- Connection _connection;
- Session _session;
- Queue _queue;
- Topic _topic;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ private Topic _topic;
@Override
public void setUp() throws Exception
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -23,11 +23,9 @@ package org.apache.qpid.server.logging;
import junit.framework.AssertionFailedError;
import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.Main;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.util.LogMonitor;
-import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
@@ -39,7 +37,7 @@ import java.util.List;
*
* BRK-1001 : Startup : Version: <Version> Build: <Build>
* BRK-1002 : Starting : Listening on <Transport> port <Port>
- * BRK-1003 : Shuting down : <Transport> port <Port>
+ * BRK-1003 : Shutting down : <Transport> port <Port>
* BRK-1004 : Ready
* BRK-1005 : Stopped
* BRK-1006 : Using configuration : <path>
@@ -809,7 +807,7 @@ public class BrokerLoggingTest extends A
setConfigurationProperty("connector.ssl.keyStorePath", getConfigurationStringProperty("management.ssl.keyStorePath"));
setConfigurationProperty("connector.ssl.keyStorePassword", getConfigurationStringProperty("management.ssl.keyStorePassword"));
- Integer sslPort = Integer.parseInt(getConfigurationStringProperty("connector.sslport"));
+ Integer sslPort = Integer.parseInt(getConfigurationStringProperty("connector.ssl.port"));
startBroker();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -21,8 +21,11 @@
package org.apache.qpid.server.logging;
import javax.jms.Connection;
-import java.util.List;
+
+import org.apache.qpid.common.QpidProperties;
+
import java.util.HashMap;
+import java.util.List;
import java.util.TreeSet;
public class ConnectionLoggingTest extends AbstractTestLogging
@@ -58,7 +61,7 @@ public class ConnectionLoggingTest exten
// Wait until opened
waitForMessage("CON-1001");
- // Close the conneciton
+ // Close the connection
connection.close();
// Wait to ensure that the desired message is logged
@@ -66,18 +69,10 @@ public class ConnectionLoggingTest exten
List<String> results = waitAndFindMatches("CON-1001");
- // Validation
- // We should have at least three messages when running InVM but when running External
- // we will get 0-10 negotiation on con:0 whcih may close at some random point
- // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open
- // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open : Protocol Version : 0-10
// MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open
// MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Protocol Version : 0-9
- // MESSAGE [con:0(/127.0.0.1:46926)] CON-1002 : Close
- // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
-
- //So check how many connections we have in the result set and extract the last one.
- // When running InVM we will have con:0 and externally con:1
+ // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 : Client Version : 1.2.3_4
+ // MESSAGE [con:0(/127.0.0.1:46927)] CON-1002 : Close
HashMap<Integer, List<String>> connectionData = splitResultsOnConnectionID(results);
@@ -87,31 +82,26 @@ public class ConnectionLoggingTest exten
//Use just the data from the last connection for the test
results = connectionData.get(connectionID);
- // If we are running inVM or with 0-10 we will get three open messagse
- // if running externally with 0-8/0-9 we will also have open and close messages from the failed 0-10 negotiation
- assertTrue("CON messages not logged:" + results.size(), results.size() >= 3);
+ assertEquals("Unexpected CON-1001 messages count", 3, results.size());
String log = getLogMessage(results, 0);
// MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open
//1 & 2
validateMessageID("CON-1001",log);
- // validate the last three CON- messages.
- // This is because when running externally we may also have logged the failed
- // 0-10 negotiation messages if using 0-8/0-9 on the broker.
-
- // 3 - Assert the options are correct
- // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
- validateConnectionOpen(results, 0, true, true, clientid);
+ // validate the last three CON-1001 messages.
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 : Client Version : 1.2.3_4
+ validateConnectionOpen(results, 0, true, true, clientid, true, QpidProperties.getReleaseVersion());
// MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Protocol Version : 0-9
- validateConnectionOpen(results, 1, true, false, null);
+ validateConnectionOpen(results, 1, true, false, null, false, null);
- validateConnectionOpen(results, 2, false, false, null);
+ // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open
+ validateConnectionOpen(results, 2, false, false, null, false, null);
}
private void validateConnectionOpen(List<String> results, int positionFromEnd,
- boolean protocolVersionPresent, boolean clientIdOptionPresent, String clientIdValue)
+ boolean protocolVersionPresent, boolean clientIdOptionPresent, String clientIdValue, boolean clientVersionPresent, String clientVersionValue)
{
String log = getLogMessageFromEnd(results, positionFromEnd);
@@ -128,6 +118,13 @@ public class ConnectionLoggingTest exten
protocolVersionPresent, fromMessage(log).contains("Protocol Version :"));
//fixme there is no way currently to find out the negotiated protocol version
// The delegate is the versioned class ((AMQConnection)connection)._delegate
+
+ assertEquals("unexpected Client ID option state", clientVersionPresent, fromMessage(log).contains("Client Version :"));
+
+ if(clientVersionPresent && clientVersionValue != null)
+ {
+ assertTrue("Client version value is not present: " + clientVersionValue, fromMessage(log).contains(clientVersionValue));
+ }
}
/**
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.logging;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import javax.jms.Connection;
import javax.jms.Queue;
import javax.jms.Session;
-import java.util.List;
import java.io.File;
+import java.util.List;
/**
* The MessageStore test suite validates that the follow log messages as
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,6 @@
*/
package org.apache.qpid.server.logging;
-import java.io.IOException;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
@@ -39,6 +29,15 @@ import org.apache.qpid.framing.ExchangeD
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.List;
+
/**
* Exchange
*
@@ -54,11 +53,11 @@ public class ExchangeLoggingTest extends
static final String EXH_PREFIX = "EXH-";
- Connection _connection;
- Session _session;
- Queue _queue;
- String _name;
- String _type;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ private String _name;
+ private String _type;
@Override
public void setUp() throws Exception
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -25,8 +25,8 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.util.LogMonitor;
-import java.util.List;
import java.io.File;
+import java.util.List;
/**
* Management Console Test Suite
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.logging;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import org.apache.qpid.util.LogMonitor;
+import java.util.Arrays;
+import java.util.List;
+
/**
* The MessageStore test suite validates that the follow log messages as
* specified in the Functional Specification.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,19 +20,19 @@
*/
package org.apache.qpid.server.logging;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Queue;
import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.naming.NamingException;
-import java.util.List;
import java.io.IOException;
+import java.util.List;
/**
* The Queue test suite validates that the follow log messages as specified in
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.logging;
+import javax.jms.QueueBrowser;
import junit.framework.AssertionFailedError;
+
import org.apache.qpid.client.AMQConnection;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.Message;
import java.io.IOException;
import java.util.List;
@@ -48,10 +50,10 @@ public class SubscriptionLoggingTest ext
{
static final String SUB_PREFIX = "SUB-";
- Connection _connection;
- Session _session;
- Queue _queue;
- Topic _topic;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ private Topic _topic;
@Override
public void setUp() throws Exception
@@ -165,8 +167,10 @@ public class SubscriptionLoggingTest ext
*/
public void testSubscriptionCreateQueueBrowser() throws JMSException, IOException
{
- _session.createBrowser(_queue);
+ _connection.start();
+ QueueBrowser browser = _session.createBrowser(_queue);
+ browser.getEnumeration();
//Validate
//Ensure that we wait for the SUB log message
waitAndFindMatches("SUB-1001");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,13 @@
package org.apache.qpid.server.logging;
-import java.util.Arrays;
-import java.util.List;
-
import junit.framework.AssertionFailedError;
import org.apache.qpid.server.configuration.ServerConfiguration;
+import java.util.Arrays;
+import java.util.List;
+
/**
* Virtualhost Test Cases
* The virtualhost test suite validates that the follow log messages as specified in the Functional Specification.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java Sat Mar 10 19:22:10 2012
@@ -23,10 +23,8 @@ package org.apache.qpid.server.message;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.AMQBindingURL;
import javax.jms.*;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,8 @@
*/
package org.apache.qpid.server.persistent;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.store.DerbyMessageStore;
-import org.apache.commons.configuration.XMLConfiguration;
+import java.util.ArrayList;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -38,61 +30,28 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.CountDownLatch;
-import java.io.File;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
- * QPID-1813 : We do not store the client id with a message so on store restart
- * that information is lost and we are unable to perform no local checks.
- *
- * QPID-1813 highlights the lack of testing here as the broker will NPE as it
- * assumes that the client id of the publisher will always exist
+ * Verifies that after recovery, a new Connection with no-local in use is
+ * able to receive messages sent prior to the broker restart.
*/
-public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements ConnectionListener
+public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
{
protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
protected static final int SEND_COUNT = 10;
- private CountDownLatch _failoverComplete = new CountDownLatch(1);
-
- protected ConnectionURL _connectionURL;
-
- @Override
- protected void setUp() throws Exception
- {
-
- XMLConfiguration configuration = new XMLConfiguration(_configFile);
- configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore");
- configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest");
-
- File tmpFile = File.createTempFile("configFile", "test");
- tmpFile.deleteOnExit();
- configuration.save(tmpFile);
-
- _configFile = tmpFile;
- _connectionURL = getConnectionURL();
-
- BrokerDetails details = _connectionURL.getBrokerDetails(0);
-
- // This will attempt to failover for 3 seconds.
- // Local testing suggests failover takes 2 seconds
- details.setProperty(BrokerDetails.OPTIONS_RETRY, "10");
- details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500");
-
- super.setUp();
- }
- public void test() throws Exception
+ public void testNoLocalNotQueued() throws Exception
{
+ if(!isBrokerStorePersistent())
+ {
+ fail("This test requires a broker with a persistent store");
+ }
- Connection connection = getConnection(_connectionURL);
+ Connection connection = getConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- Topic topic = (Topic) getInitialContext().lookup("topic");
+ Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
TopicSubscriber noLocalSubscriber = session.
createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
@@ -102,88 +61,104 @@ public class NoLocalAfterRecoveryTest ex
createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal",
null, false);
- List<Message> sent = sendMessage(session, topic, SEND_COUNT);
-
- session.commit();
-
- assertEquals("Incorrect number of messages sent",
- SEND_COUNT, sent.size());
-
+ sendMessage(session, topic, SEND_COUNT);
// Check messages can be received as expected.
connection.start();
- assertTrue("No Local Subscriber is not a no-local subscriber",
- noLocalSubscriber.getNoLocal());
-
- assertFalse("Normal Subscriber is a no-local subscriber",
- normalSubscriber.getNoLocal());
-
-
+ //As the no-local subscriber was on the same connection the messages were
+ //published on, tit will receive no messages as they will be discarded on the broker
List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
assertEquals("No Local Subscriber Received messages", 0, received.size());
received = receiveMessage(normalSubscriber, SEND_COUNT);
assertEquals("Normal Subscriber Received no messages",
SEND_COUNT, received.size());
+ session.commit();
+ normalSubscriber.close();
+ connection.close();
- ((AMQConnection)connection).setConnectionListener(this);
-
+ //Ensure the no-local subscribers messages were discarded by restarting the broker
+ //and reconnecting to the subscription to ensure they were not recovered.
restartBroker();
+ Connection connection2 = getConnection();
+ connection2.start();
+
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber2 = session2.
+ createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
+ null, true);
+
+ // The NO-local subscriber should not get any messages
+ received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+ session2.commit();
+ assertEquals("No Local Subscriber Received messages", 0, received.size());
+
+ noLocalSubscriber2.close();
+
+
+ }
- //Await
- if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS))
+
+ public void testNonNoLocalQueued() throws Exception
+ {
+ if(!isBrokerStorePersistent())
{
- fail("Failover Failed to compelete");
+ fail("This test requires a broker with a persistent store");
}
- session.rollback();
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
- //Failover will restablish our clients
- assertTrue("No Local Subscriber is not a no-local subscriber",
- noLocalSubscriber.getNoLocal());
+ TopicSubscriber noLocalSubscriber =
+ session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
- assertFalse("Normal Subscriber is a no-local subscriber",
- normalSubscriber.getNoLocal());
+ sendMessage(session, topic, SEND_COUNT);
+
+ // Check messages can be received as expected.
+ connection.start();
+
+ List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+ assertEquals("No Local Subscriber Received messages", 0, received.size());
- // NOTE : here that the NO-local subscriber actually now gets ALL the
- // messages as the connection has failed and they are consuming on a
- // different connnection to the one that was published on.
- received = receiveMessage(noLocalSubscriber, SEND_COUNT);
- assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size());
- received = receiveMessage(normalSubscriber, SEND_COUNT);
- assertEquals("Normal Subscriber Received no messages",
- SEND_COUNT, received.size());
- //leave the store in a clean state.
session.commit();
- }
- protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer,
- int count) throws JMSException
- {
+ Connection connection3 = getConnection();
+ Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(session3, topic, SEND_COUNT);
- List<Message> receivedMessages = new ArrayList<Message>(count);
- for (int i = 0; i < count; i++)
- {
- Message received = messageConsumer.receive(1000);
- if (received != null)
- {
- receivedMessages.add(received);
- }
- else
- {
- fail("Only "
- + receivedMessages.size() + "/" + count + " received.");
- }
- }
+ connection.close();
+
+ //We didn't receive the messages on the durable queue for the no-local subscriber
+ //so they are still on the broker. Restart the broker, prompting their recovery.
+ restartBroker();
+
+ Connection connection2 = getConnection();
+ connection2.start();
+
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber2 =
+ session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
+
+ // The NO-local subscriber should receive messages sent from connection3
+ received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+ session2.commit();
+ assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
+
+ noLocalSubscriber2.close();
+
- return receivedMessages;
}
protected List<Message> receiveMessage(MessageConsumer messageConsumer,
@@ -207,29 +182,4 @@ public class NoLocalAfterRecoveryTest ex
return receivedMessages;
}
-
- public void bytesSent(long count)
- {
-
- }
-
- public void bytesReceived(long count)
- {
-
- }
-
- public boolean preFailover(boolean redirect)
- {
- return true;
- }
-
- public boolean preResubscribe()
- {
- return true;
- }
-
- public void failoverComplete()
- {
- _failoverComplete.countDown();
- }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java Sat Mar 10 19:22:10 2012
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -30,9 +30,9 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQException;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,11 +34,8 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.HashMap;
+import java.util.Map;
public class PriorityQueueTest extends QpidBrokerTestCase
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Sat Mar 10 19:22:10 2012
@@ -20,38 +20,44 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQDestination;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import javax.naming.NamingException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.io.IOException;
public class ProducerFlowControlTest extends AbstractTestLogging
{
private static final int TIMEOUT = 10000;
- private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
-
private Connection producerConnection;
- private MessageProducer producer;
- private Session producerSession;
- private Queue queue;
private Connection consumerConnection;
+ private Session producerSession;
private Session consumerSession;
-
+ private MessageProducer producer;
private MessageConsumer consumer;
- private final AtomicInteger _sentMessages = new AtomicInteger();
+ private Queue queue;
+
+ private final AtomicInteger _sentMessages = new AtomicInteger(0);
private JMXTestUtils _jmxUtils;
private boolean _jmxUtilConnected;
@@ -77,37 +83,34 @@ public class ProducerFlowControlTest ext
public void tearDown() throws Exception
{
- if(_jmxUtilConnected)
+ try
{
- try
+ if(_jmxUtilConnected)
{
- _jmxUtils.close();
- }
- catch (IOException e)
- {
- e.printStackTrace();
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
+ producerConnection.close();
+ consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
}
- producerConnection.close();
- consumerConnection.close();
- super.tearDown();
}
- public void testCapacityExceededCausesBlock()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testCapacityExceededCausesBlock() throws Exception
{
String queueName = getTestQueueName();
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
-
- _sentMessages.set(0);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+ producer = producerSession.createProducer(queue);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -135,22 +138,14 @@ public class ProducerFlowControlTest ext
}
- public void testBrokerLogMessages()
- throws JMSException, NamingException, AMQException, InterruptedException, IOException
+
+ public void testBrokerLogMessages() throws Exception
{
String queueName = getTestQueueName();
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
producer = producerSession.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -162,41 +157,28 @@ public class ProducerFlowControlTest ext
consumerConnection.start();
- while(consumer.receive(1000) != null);
+ while(consumer.receive(1000) != null) {};
results = waitAndFindMatches("QUE-1004");
assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
-
-
-
}
- public void testClientLogMessages()
- throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ public void testClientLogMessages() throws Exception
{
String queueName = getTestQueueName();
-
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
producer = session.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 50L);
List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
assertTrue("No delay messages logged by client",results.size()!=0);
@@ -205,26 +187,16 @@ public class ProducerFlowControlTest ext
+ " flow control", TIMEOUT);
assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+ "messages)",1,failedMessages.size());
-
-
-
}
- public void testFlowControlOnCapacityResumeEqual()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testFlowControlOnCapacityResumeEqual() throws Exception
{
String queueName = getTestQueueName();
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",1000);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000);
producer = producerSession.createProducer(queue);
- _sentMessages.set(0);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -236,7 +208,6 @@ public class ProducerFlowControlTest ext
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
-
consumer.receive();
Thread.sleep(1000);
@@ -247,23 +218,16 @@ public class ProducerFlowControlTest ext
}
- public void testFlowControlSoak()
- throws Exception, NamingException, AMQException, InterruptedException
+ public void testFlowControlSoak() throws Exception
{
String queueName = getTestQueueName();
- _sentMessages.set(0);
+
final int numProducers = 10;
final int numMessages = 100;
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",6000);
- arguments.put("x-qpid-flow-resume-capacity",3000);
-
- ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
- ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
Connection[] producers = new Connection[numProducers];
@@ -303,58 +267,38 @@ public class ProducerFlowControlTest ext
}
-
-
- public void testSendTimeout()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testSendTimeout() throws Exception
{
String queueName = getTestQueueName();
-
+ final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+ : "Unable to send message for 3 seconds due to broker enforced flow control";
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
producer = session.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L);
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 100L);
-
- Thread.sleep(10000);
-
- Exception e = sender.getException();
+ Exception e = sender.awaitSenderException(10000);
assertNotNull("No timeout exception on sending", e);
+
+ assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+
}
-
-
- public void testFlowControlAttributeModificationViaJMX()
- throws JMSException, NamingException, AMQException, InterruptedException, Exception
+
+ public void testFlowControlAttributeModificationViaJMX() throws Exception
{
_jmxUtils.open();
_jmxUtilConnected = true;
String queueName = getTestQueueName();
-
- //create queue
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",0);
- arguments.put("x-qpid-flow-resume-capacity",0);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0);
producer = producerSession.createProducer(queue);
Thread.sleep(1000);
@@ -375,7 +319,7 @@ public class ProducerFlowControlTest ext
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
// try to send 2 messages (should block after 1)
- _sentMessages.set(0);
+
sendMessagesAsync(producer, producerSession, 2, 50L);
Thread.sleep(2000);
@@ -406,13 +350,23 @@ public class ProducerFlowControlTest ext
consumer.receive();
//perform a synchronous op on the connection
- ((AMQSession) consumerSession).sync();
+ ((AMQSession<?,?>) consumerSession).sync();
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
consumer.receive();
}
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",capacity);
+ arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
+
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
final int numMessages,
@@ -435,7 +389,7 @@ public class ProducerFlowControlTest ext
try
{
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
}
catch (AMQException e)
{
@@ -456,7 +410,6 @@ public class ProducerFlowControlTest ext
private static final byte[] BYTE_300 = new byte[300];
-
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
BytesMessage send = producerSession.createBytesMessage();
@@ -466,22 +419,19 @@ public class ProducerFlowControlTest ext
return send;
}
-
private class MessageSender implements Runnable
{
- private final MessageProducer _producer;
- private final Session _producerSession;
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
private final int _numMessages;
-
-
-
- private JMSException _exception;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
private long _sleepPeriod;
public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
{
- _producer = producer;
- _producerSession = producerSession;
+ _senderProducer = producer;
+ _senderSession = producerSession;
_numMessages = numMessages;
_sleepPeriod = sleepPeriod;
}
@@ -490,16 +440,18 @@ public class ProducerFlowControlTest ext
{
try
{
- sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
}
catch (JMSException e)
{
_exception = e;
+ _exceptionThrownLatch.countDown();
}
}
- public JMSException getException()
+ public Exception awaitSenderException(long timeout) throws InterruptedException
{
+ _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
return _exception;
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,11 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -29,11 +34,6 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
/**
* Test Case to ensure that messages are correctly returned.
* This includes checking:
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java Sat Mar 10 19:22:10 2012
@@ -19,11 +19,15 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -34,14 +38,12 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
public class SortedQueueTest extends QpidBrokerTestCase
{
@@ -54,6 +56,7 @@ public class SortedQueueTest extends Qpi
private Connection _producerConnection;
private Session _producerSession;
private Connection _consumerConnection;
+ private long _receiveInterval;
protected void setUp() throws Exception
{
@@ -64,6 +67,7 @@ public class SortedQueueTest extends Qpi
_producerConnection = getConnection();
_consumerConnection = getConnection();
_producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l;
}
protected void tearDown() throws Exception
@@ -94,7 +98,7 @@ public class SortedQueueTest extends Qpi
_consumerConnection.start();
TextMessage received;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount],
received.getStringProperty(TEST_SORT_KEY));
@@ -138,17 +142,15 @@ public class SortedQueueTest extends Qpi
_producerSession.commit();
}
- synchronized(consumerThread)
+ try
{
- try
- {
- consumerThread.join(5000L);
- }
- catch(InterruptedException e)
- {
- fail("Test failed waiting for consumer to complete");
- }
+ consumerThread.join(getConsumerThreadJoinInterval());
}
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
assertTrue("Consumer timed out", consumerThread.isStopped());
assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed());
@@ -172,23 +174,26 @@ public class SortedQueueTest extends Qpi
_producerSession.commit();
}
- synchronized(consumerThread)
+ try
{
- try
- {
- consumerThread.join(5000L);
- }
- catch(InterruptedException e)
- {
- fail("Test failed waiting for consumer to complete");
- }
+ consumerThread.join(getConsumerThreadJoinInterval());
}
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
assertTrue("Consumer timed out", consumerThread.isStopped());
assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed());
producer.close();
}
+ private long getConsumerThreadJoinInterval()
+ {
+ return isBrokerStorePersistent() ? 50000L: 5000L;
+ }
+
public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException
{
final Queue queue = createQueue();
@@ -211,7 +216,7 @@ public class SortedQueueTest extends Qpi
TextMessage received = null;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue",
received.getStringProperty(TEST_SORT_KEY));
@@ -247,7 +252,7 @@ public class SortedQueueTest extends Qpi
TextMessage received;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10],
received.getStringProperty(TEST_SORT_KEY));
@@ -362,16 +367,16 @@ public class SortedQueueTest extends Qpi
private Message assertReceiveMessage(final MessageConsumer consumer)
throws JMSException
{
- final Message received = (TextMessage) consumer.receive(10000);
+ final Message received = (TextMessage) consumer.receive(_receiveInterval);
assertNotNull("Received message is unexpectedly null", received);
return received;
}
private class TestConsumerThread extends Thread
{
- private boolean _stopped = false;
+ private final AtomicInteger _consumed = new AtomicInteger(0);
+ private volatile boolean _stopped = false;
private int _count = 0;
- private int _consumed = 0;
private int _sessionType = Session.AUTO_ACKNOWLEDGE;
private Queue _queue;
@@ -402,7 +407,7 @@ public class SortedQueueTest extends Qpi
conn.start();
Message msg;
- while((msg = consumer.receive(1000)) != null)
+ while((msg = consumer.receive(_receiveInterval)) != null)
{
if(_sessionType == Session.SESSION_TRANSACTED)
{
@@ -415,7 +420,7 @@ public class SortedQueueTest extends Qpi
{
LOGGER.debug("transacted session commit");
session.commit();
- _consumed++;
+ _consumed.incrementAndGet();
}
}
else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
@@ -429,18 +434,18 @@ public class SortedQueueTest extends Qpi
{
LOGGER.debug("client ack session acknowledge");
msg.acknowledge();
- _consumed++;
+ _consumed.incrementAndGet();
}
}
else
{
LOGGER.debug("auto ack session");
- _consumed++;
+ _consumed.incrementAndGet();
}
_count++;
LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
- LOGGER.debug("Message consumed with consumed index: " + _consumed);
+ LOGGER.debug("Message consumed with consumed index: " + _consumed.get());
}
_stopped = true;
@@ -453,14 +458,14 @@ public class SortedQueueTest extends Qpi
}
}
- public synchronized boolean isStopped()
+ public boolean isStopped()
{
return _stopped;
}
- public synchronized int getConsumed()
+ public int getConsumed()
{
- return _consumed;
+ return _consumed.get();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Sat Mar 10 19:22:10 2012
@@ -21,8 +21,15 @@
package org.apache.qpid.server.queue;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -32,16 +39,8 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
-
-import junit.framework.Assert;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class TimeToLiveTest extends QpidBrokerTestCase
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org