You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/28 07:38:27 UTC
[1/7] qpid-broker-j git commit: QPID-6933: [System Tests] Add module
for JMS 1.1 system tests and start moving JMS 1.1 tests into it
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 031e00611 -> 59218fdc3
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 8b34152..cd9bd7d 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -47,12 +47,8 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
- * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
- * a static on a base test helper class, e.g. TestUtils.
- *
- * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
- * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
- * driven by the test model.
+ * The tests in the suite only test 0-x client specific behaviour.
+ * The tests should be moved into client or removed
*/
public class DurableSubscriptionTest extends QpidBrokerTestCase
{
@@ -62,80 +58,6 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
private static final String MY_SUBSCRIPTION = "MySubscription";
- public void testUnsubscribe() throws Exception
- {
- TopicConnection con = (TopicConnection) getConnection();
- Topic topic = createTopic(con, "MyDurableSubscriptionTestTopic");
- LOGGER.info("Create Session 1");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- LOGGER.info("Create Consumer on Session 1");
- MessageConsumer consumer1 = session1.createConsumer(topic);
- LOGGER.info("Create Producer on Session 1");
- MessageProducer producer = session1.createProducer(topic);
-
- LOGGER.info("Create Session 2");
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- LOGGER.info("Create Durable Subscriber on Session 2");
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
-
- LOGGER.info("Starting connection");
- con.start();
-
- LOGGER.info("Producer sending message A");
- producer.send(session1.createTextMessage("A"));
-
- //check the dur sub's underlying queue now has msg count 1
- AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + MY_SUBSCRIPTION);
- assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true));
-
- Message msg;
- LOGGER.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- LOGGER.info("Receive message on consumer 2:expecting A");
- msg = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(getShortReceiveTimeout());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- assertEquals(null, msg);
-
- //check the dur sub's underlying queue now has msg count 0
- assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue, true));
-
- consumer2.close();
- LOGGER.info("Unsubscribe session2/consumer2");
- session2.unsubscribe(MY_SUBSCRIPTION);
-
- ((AMQSession<?, ?>) session2).sync();
-
- if(isJavaBroker())
- {
- assertFalse("Queue " + subQueue + " exists", ((AMQSession<?, ?>) session2).isQueueBound(subQueue));
- }
-
- //verify unsubscribing the durable subscriber did not affect the non-durable one
- LOGGER.info("Producer sending message B");
- producer.send(session1.createTextMessage("B"));
-
- LOGGER.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("B", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- LOGGER.info("Close connection");
- con.close();
- }
-
-
/**
* Specifically uses a subscriber with a selector because QPID-4731 found that selectors
* can prevent queue removal.
@@ -514,171 +436,6 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
session.unsubscribe("testDurableWithInvalidDestinationsub");
}
-
- /**
- * Creates a durable subscription with a selector, then changes that selector on resubscription
- * <p>
- * QPID-1202, QPID-2418
- */
- public void testResubscribeWithChangedSelector() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "testResubscribeWithChangedSelector");
- MessageProducer producer = session.createProducer(topic);
-
- // Create durable subscriber that matches A
- TopicSubscriber subA = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelector",
- "Match = True", false);
-
- // Send 1 matching message and 1 non-matching message
- sendMatchingAndNonMatchingMessage(session, producer);
-
- Message rMsg = subA.receive(getShortReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelector1",
- ((TextMessage) rMsg).getText());
-
- rMsg = subA.receive(getShortReceiveTimeout());
- assertNull(rMsg);
-
- // Disconnect subscriber
- subA.close();
-
- // Reconnect with new selector that matches B
- TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelector","Match = False", false);
-
- //verify no messages are now received.
- rMsg = subB.receive(getShortReceiveTimeout());
- assertNull("Should not have received message as the selector was changed", rMsg);
-
- // Check that new messages are received properly
- sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(getReceiveTimeout());
-
- assertNotNull("Message should have been received", rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelector2",
- ((TextMessage) rMsg).getText());
-
-
- rMsg = subB.receive(getShortReceiveTimeout());
- assertNull("Message should not have been received",rMsg);
- session.unsubscribe("testResubscribeWithChangedSelector");
- }
-
- public void testDurableSubscribeWithTemporaryTopic() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session ssn = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = ssn.createTemporaryTopic();
- try
- {
- ssn.createDurableSubscriber(topic, "test");
- fail("expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // this is expected
- }
- try
- {
- ssn.createDurableSubscriber(topic, "test", null, false);
- fail("expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // this is expected
- }
- }
-
- private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException
- {
- TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelector2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
- }
-
-
- /**
- * create and register a durable subscriber with a message selector and then close it
- * create a publisher and send 5 right messages and 5 wrong messages
- * create another durable subscriber with the same selector and name
- * check messages are still there
- * <p>
- * QPID-2418
- */
- public void testDurSubSameMessageSelector() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = createTopic(conn, "sameMessageSelector");
-
- //create and register a durable subscriber with a message selector and then close it
- TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
- subOne.close();
-
- MessageProducer producer = session.createProducer(topic);
- for (int i = 0; i < 5; i++)
- {
- Message message = session.createMessage();
- message.setBooleanProperty("testprop", true);
- producer.send(message);
- message = session.createMessage();
- message.setBooleanProperty("testprop", false);
- producer.send(message);
- }
- session.commit();
- producer.close();
-
- // should be 5 or 10 messages on queue now
- // (5 for the Apache Qpid Broker-J due to use of server side selectors, and 10 for the cpp broker due to client side selectors only)
- AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
-
- // now recreate the durable subscriber and check the received messages
- TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
-
- for (int i = 0; i < 5; i++)
- {
- Message message = subTwo.receive(getReceiveTimeout());
- if (message == null)
- {
- fail("sameMessageSelector test failed. no message was returned");
- }
- else
- {
- assertEquals("sameMessageSelector test failed. message selector not reset",
- "true", message.getStringProperty("testprop"));
- }
- }
-
- session.commit();
-
- // Check queue has no messages
- if (isJavaBroker())
- {
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
- }
- else
- {
- assertTrue("At most the queue should have only 1 message", ((AMQSession<?, ?>) session).getQueueDepth(queue) <= 1);
- }
-
- // Unsubscribe
- session.unsubscribe("sameMessageSelector");
-
- conn.close();
- }
/**
* <ul>
@@ -690,6 +447,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
* </ul>
* <p>
* QPID-2418
+ *
+ * TODO: it seems that client behaves in not jms spec compliant:
+ * the client allows subscription recreation with a new selector whilst an active subscriber is connected
*/
public void testResubscribeWithChangedSelectorNoClose() throws Exception
{
@@ -762,6 +522,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
* </ul>
* <p>
* QPID-2418
+ *
+ * TODO: it seems that client behaves in not jms spec compliant:
+ * the client allows subscription recreation with a new selector whilst active subscriber is connected
*/
public void testDurSubAddMessageSelectorNoClose() throws Exception
{
@@ -820,176 +583,4 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
conn.close();
}
- /**
- * <ul>
- * <li>create and register a durable subscriber with no message selector
- * <li>try to create another durable with the same name, should fail
- * </ul>
- * <p>
- * QPID-2418
- */
- public void testDurSubNoSelectorResubscribeNoClose() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "subscriptionName");
-
- // create and register a durable subscriber with no message selector
- session.createDurableSubscriber(topic, "subscriptionName", null, false);
-
- // try to recreate the durable subscriber
- try
- {
- session.createDurableSubscriber(topic, "subscriptionName", null, false);
- fail("Subscription should not have been created");
- }
- catch (Exception e)
- {
- LOGGER.error("Error creating durable subscriber",e);
- }
- }
-
- /**
- * Tests that a subscriber created on a same <i>session</i> as producer with
- * no local true does not receive messages.
- */
- public void testNoLocalOnSameSession() throws Exception
- {
- Connection connection = getConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(getTestQueueName());
- MessageProducer producer = session.createProducer(topic);
- TopicSubscriber subscriber = null;
- try
- {
- subscriber = session.createDurableSubscriber(topic, getTestName(), null, true);
- connection.start();
-
- producer.send(createNextMessage(session, 1));
-
- Message m = subscriber.receive(getShortReceiveTimeout());
- assertNull("Unexpected message received", m);
- }
- finally
- {
- session.unsubscribe(getTestName());
- }
- }
-
-
- /**
- * Tests that a subscriber created on a same <i>connection</i> but separate
- * <i>sessionM</i> as producer with no local true does not receive messages.
- */
- public void testNoLocalOnSameConnection() throws Exception
- {
- Connection connection = getConnection();
-
- Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getTestQueueName());
- MessageProducer producer = producerSession.createProducer(topic);
-
- TopicSubscriber subscriber = null;
- try
- {
- subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
- connection.start();
-
- producer.send(createNextMessage(producerSession, 1));
-
- Message m = subscriber.receive(getShortReceiveTimeout());
- assertNull("Unexpected message received", m);
- }
- finally
- {
- consumerSession.unsubscribe(getTestName());
- }
- }
-
- /**
- * Tests that if no-local is in use, that the messages are delivered when
- * the client reconnects.
- *
- * Currently fails on the Apache Qpid Broker-J due to QPID-3605.
- */
- public void testNoLocalMessagesNotDeliveredAfterReconnection() throws Exception
- {
- Connection connection = getConnection();
-
- Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getTestQueueName());
- MessageProducer producer = producerSession.createProducer(topic);
-
- TopicSubscriber subscriber = null;
- try
- {
- subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
- connection.start();
-
- producer.send(createNextMessage(producerSession, 1));
-
- Message m = subscriber.receive(getShortReceiveTimeout());
- assertNull("Unexpected message received", m);
-
- connection.close();
-
- connection = getConnection();
-
- consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true);
- connection.start();
- m = subscriber.receive(getShortReceiveTimeout());
- assertNull("Message should not be received on a new connection", m);
- }
- finally
- {
- consumerSession.unsubscribe(getTestName());
- }
- }
-
- /**
- * Tests that messages are delivered normally to a subscriber on a separate connection despite
- * the use of durable subscriber with no-local on the first connection.
- */
- public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception
- {
- Connection noLocalConnection = getConnection();
- Connection connection = getConnection();
-
- String noLocalSubId1 = getTestName() + "subId1";
- String subId = getTestName() + "subId2";
-
- Session noLocalSession = noLocalConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic noLocalTopic = noLocalSession.createTopic(getTestQueueName());
-
- Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getTestQueueName());
-
- TopicSubscriber noLocalSubscriber = null;
- TopicSubscriber subscriber = null;
- try
- {
- MessageProducer producer = noLocalSession.createProducer(noLocalTopic);
- noLocalSubscriber = noLocalSession.createDurableSubscriber(noLocalTopic, noLocalSubId1, null, true);
- subscriber = consumerSession.createDurableSubscriber(topic, subId, null, true);
- noLocalConnection.start();
- connection.start();
-
- producer.send(createNextMessage(noLocalSession, 1));
-
- Message m1 = noLocalSubscriber.receive(getShortReceiveTimeout());
- assertNull("Subscriber on nolocal connection should not receive message", m1);
-
- Message m2 = subscriber.receive(getShortReceiveTimeout());
- assertNotNull("Subscriber on non-nolocal connection should receive message", m2);
- }
- finally
- {
- noLocalSession.unsubscribe(noLocalSubId1);
- consumerSession.unsubscribe(subId);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/test-profiles/CPPTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPTransientExcludes b/test-profiles/CPPTransientExcludes
index 0605cb8..e30e9d6 100644
--- a/test-profiles/CPPTransientExcludes
+++ b/test-profiles/CPPTransientExcludes
@@ -17,9 +17,6 @@
// under the License.
//
-// those tests need durable subscribe states to be persisted
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
-
// those tests require broker recovery
org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index d719137..76bd08c 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -63,9 +63,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificat
// QPID-3604 This fix is applied only to the 0-10 code, hence this test does not work for pre 0-10.
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testConnectionStop
-//Tests durable subscription selector verification behaviour that 0-8/0-9/0-9-1 cant provide
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
-
// QPID-3396
org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[7/7] qpid-broker-j git commit: QPID-8055: [Performance Tests] Remove
hardcoded reference to legacy jms client context factory from perftests
runner
Posted by or...@apache.org.
QPID-8055: [Performance Tests] Remove hardcoded reference to legacy jms client context factory from perftests runner
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0b7666eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0b7666eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0b7666eb
Branch: refs/heads/master
Commit: 0b7666ebe869de6cfdc34e13ff9d6c8e18c0cd5e
Parents: 9089861
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Nov 24 14:21:04 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
.../apache/qpid/disttest/AbstractRunner.java | 38 +++++++++++++++-----
1 file changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0b7666eb/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
----------------------------------------------------------------------
diff --git a/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java b/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
index 1c9366a..48f1bc5 100644
--- a/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
+++ b/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
@@ -20,9 +20,16 @@
*/
package org.apache.qpid.disttest;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
+import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -41,19 +48,19 @@ public class AbstractRunner
protected Context getContext()
{
String jndiConfig = getJndiConfig();
- Hashtable env = new Hashtable();
- env.put(Context.PROVIDER_URL, jndiConfig);
- // Java allows this to be overridden with a system property of the same name
- if (!System.getProperties().containsKey(InitialContext.INITIAL_CONTEXT_FACTORY))
- {
- env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- }
try
{
- return new InitialContext(env);
+ Properties properties = new Properties();
+ properties.put(Context.PROVIDER_URL, jndiConfig);
+ try(InputStream is = getJndiConfigurationInputStream(jndiConfig))
+ {
+ properties.load(is);
+ }
+
+ return new InitialContext(properties);
}
- catch (NamingException e)
+ catch (IOException | NamingException e)
{
throw new DistributedTestException("Exception whilst creating InitialContext from URL '"
+ jndiConfig + "'", e);
@@ -75,4 +82,17 @@ public class AbstractRunner
{
return _cliOptions;
}
+
+ private InputStream getJndiConfigurationInputStream(final String providerUrl) throws IOException
+ {
+ try
+ {
+ URL url = new URL(providerUrl);
+ return url.openStream();
+ }
+ catch (MalformedURLException mue)
+ {
+ return new FileInputStream(new File(providerUrl));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/7] qpid-broker-j git commit: QPID-6933: [System Tests] Change JMS
2.0 system tests to start broker per test suite
Posted by or...@apache.org.
QPID-6933: [System Tests] Change JMS 2.0 system tests to start broker per test suite
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/6dc32330
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/6dc32330
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/6dc32330
Branch: refs/heads/master
Commit: 6dc32330fc7309650d7feaff8981ae72ddde81df
Parents: 031e006
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Nov 23 13:27:40 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
systests/qpid-systests-jms_2.0/pom.xml | 19 +
.../qpid/systests/jms_2_0/Jms2TestBase.java | 144 +++++++
.../org/apache/qpid/systests/jms_2_0/Utils.java | 97 +++++
.../src/main/resources/config-jms2-tests.json | 98 +++++
.../jms_2_0/connection/ConnectionTest.java | 9 +-
.../deliverycount/DeliveryCountTest.java | 38 +-
.../deliverydelay/DeliveryDelayTest.java | 134 +++---
.../subscription/SharedSubscriptionTest.java | 429 ++++++++++---------
.../qpid/test/utils/AmqpManagementFacade.java | 26 +-
.../qpid/test/utils/ConnectionBuilder.java | 6 +-
.../qpid/test/utils/QpidBrokerTestCase.java | 5 +-
.../utils/QpidJmsClient0xConnectionBuilder.java | 34 +-
.../utils/QpidJmsClientConnectionBuilder.java | 76 +++-
.../utils/EmbeddedBrokerPerClassAdminImpl.java | 2 +-
14 files changed, 779 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/pom.xml b/systests/qpid-systests-jms_2.0/pom.xml
index ec664e1..9db61d4 100644
--- a/systests/qpid-systests-jms_2.0/pom.xml
+++ b/systests/qpid-systests-jms_2.0/pom.xml
@@ -69,6 +69,11 @@
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-utils</artifactId>
+ </dependency>
+
</dependencies>
<profiles>
@@ -111,4 +116,18 @@
</profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.initialConfigurationLocation>classpath:config-jms2-tests.json</qpid.initialConfigurationLocation>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Jms2TestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Jms2TestBase.java b/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Jms2TestBase.java
new file mode 100644
index 0000000..e13a9d9
--- /dev/null
+++ b/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Jms2TestBase.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_2_0;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import org.apache.qpid.test.utils.AmqpManagementFacade;
+import org.apache.qpid.test.utils.ConnectionBuilder;
+import org.apache.qpid.test.utils.JmsProvider;
+import org.apache.qpid.test.utils.QpidJmsClientProvider;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.url.URLSyntaxException;
+
+public abstract class Jms2TestBase extends BrokerAdminUsingTestBase
+{
+ private static JmsProvider _jmsProvider;
+ private static final AmqpManagementFacade _managementFacade = new AmqpManagementFacade("$management");
+
+ @Rule
+ public final TestName _testName = new TestName();
+ private final List<Connection> _connections = new ArrayList<>();
+
+ @BeforeClass
+ public static void setUpTestBase()
+ {
+ _jmsProvider = new QpidJmsClientProvider(_managementFacade);
+ }
+
+ @After
+ public void tearDown()
+ {
+ List<JMSException> exceptions = new ArrayList<>();
+ for (Connection connection : _connections)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ exceptions.add(e);
+ }
+ }
+ if (!exceptions.isEmpty())
+ {
+ JMSRuntimeException jmsRuntimeException = new JMSRuntimeException("Exception(s) occurred during closing of JMS connections.");
+ for (JMSException exception : exceptions)
+ {
+ jmsRuntimeException.addSuppressed(exception);
+ }
+ throw jmsRuntimeException;
+ }
+ }
+
+ protected ConnectionBuilder getConnectionBuilder()
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ return _jmsProvider.getConnectionBuilder()
+ .setHost(brokerAddress.getHostName())
+ .setPort(brokerAddress.getPort())
+ .setUsername(getBrokerAdmin().getValidUsername())
+ .setPassword(getBrokerAdmin().getValidPassword());
+ }
+
+ protected void createEntityUsingAmqpManagement(final String entityName,
+ final String entityType,
+ final Map<String, Object> attributes)
+ throws Exception
+ {
+ try (Connection connection = getConnection())
+ {
+ connection.start();
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+ _managementFacade.createEntityUsingAmqpManagement(entityName, session, entityType, attributes);
+ }
+ }
+
+ protected Object performOperationUsingAmqpManagement(final String name,
+ final String operation,
+ final String type,
+ Map<String, Object> arguments)
+ throws Exception
+ {
+ try (Connection connection = getConnection())
+ {
+ connection.start();
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+ return _managementFacade.performOperationUsingAmqpManagement(name, operation, session, type, arguments);
+ }
+ }
+
+ protected Connection getConnection() throws JMSException, NamingException, URLSyntaxException
+ {
+ return getConnectionBuilder().build();
+ }
+
+ protected long getReceiveTimeout()
+ {
+ return Long.getLong("qpid.test_receive_timeout", 1000L);
+ }
+
+ protected String getVirtualHostName()
+ {
+ return getClass().getSimpleName() + "_" + _testName.getMethodName();
+ }
+
+ protected String getTestName()
+ {
+ return _testName.getMethodName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Utils.java b/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Utils.java
new file mode 100644
index 0000000..4e5f76f
--- /dev/null
+++ b/systests/qpid-systests-jms_2.0/src/main/java/org/apache/qpid/systests/jms_2_0/Utils.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_2_0;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public class Utils
+{
+ private static final int DEFAULT_MESSAGE_SIZE = 1024;
+ public static final String INDEX = "index";
+ private static final String DEFAULT_MESSAGE_PAYLOAD = createString(DEFAULT_MESSAGE_SIZE);
+
+ public static List<Message> sendMessage(Session session, Destination destination, int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<>(count);
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < (count); i++)
+ {
+ Message next = createNextMessage(session, i);
+ producer.send(next);
+ messages.add(next);
+ }
+
+ if (session.getTransacted())
+ {
+ session.commit();
+ }
+
+ return messages;
+ }
+
+ public static Message createNextMessage(Session session, int msgCount) throws JMSException
+ {
+ Message message = createMessage(session, DEFAULT_MESSAGE_SIZE);
+ message.setIntProperty(INDEX, msgCount);
+
+ return message;
+ }
+
+ public static Message createMessage(Session session, int messageSize) throws JMSException
+ {
+ String payload;
+ if (messageSize == DEFAULT_MESSAGE_SIZE)
+ {
+ payload = DEFAULT_MESSAGE_PAYLOAD;
+ }
+ else
+ {
+ payload = createString(messageSize);
+ }
+
+ return session.createTextMessage(payload);
+ }
+
+ private static String createString(final int stringSize)
+ {
+ final String payload;
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < stringSize; ++i)
+ {
+ stringBuilder.append("x");
+ }
+ payload = stringBuilder.toString();
+ return payload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/main/resources/config-jms2-tests.json
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/main/resources/config-jms2-tests.json b/systests/qpid-systests-jms_2.0/src/main/resources/config-jms2-tests.json
new file mode 100644
index 0000000..764ff89
--- /dev/null
+++ b/systests/qpid-systests-jms_2.0/src/main/resources/config-jms2-tests.json
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "secureOnlyMechanisms" : [],
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQPWS",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "transports" : ["WS"],
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ } ],
+ "virtualhostnodes" : []
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/connection/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/connection/ConnectionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/connection/ConnectionTest.java
index 906fe1f..3472620 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/connection/ConnectionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/connection/ConnectionTest.java
@@ -20,12 +20,17 @@
package org.apache.qpid.systests.jms_2_0.connection;
+import static org.junit.Assert.assertNotNull;
+
import javax.jms.Connection;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.junit.Test;
+
+import org.apache.qpid.systests.jms_2_0.Jms2TestBase;
-public class ConnectionTest extends QpidBrokerTestCase
+public class ConnectionTest extends Jms2TestBase
{
+ @Test
public void testConnection() throws Exception
{
Connection con = getConnection();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
index f1cbe41..85a6bad 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
@@ -19,6 +19,10 @@ package org.apache.qpid.systests.jms_2_0.deliverycount;/*
*
*/
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
import java.util.HashMap;
import java.util.Map;
@@ -30,39 +34,42 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.systests.jms_2_0.Jms2TestBase;
+import org.apache.qpid.systests.jms_2_0.Utils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.tests.utils.BrokerAdmin;
-public class DeliveryCountTest extends QpidBrokerTestCase
+public class DeliveryCountTest extends Jms2TestBase
{
private static final int MAX_DELIVERY_ATTEMPTS = 3;
private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
private Queue _queue;
- @Override
+ @Before
public void setUp() throws Exception
{
- super.setUp();
- try (Connection connection = getConnectionWithPrefetch(0))
+ String testQueueName = BrokerAdmin.TEST_QUEUE_NAME;
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, testQueueName);
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_ATTEMPTS);
+ createEntityUsingAmqpManagement(testQueueName, "org.apache.qpid.StandardQueue", attributes);
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
{
- String testQueueName = getTestQueueName();
connection.start();
Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
- final Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Queue.NAME, testQueueName);
- attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_ATTEMPTS);
- createEntityUsingAmqpManagement(testQueueName,
- session,
- "org.apache.qpid.StandardQueue",
- attributes);
_queue = session.createQueue(testQueueName);
- sendMessage(session, _queue, 1);
+ Utils.sendMessage(session, _queue, 1);
}
}
+ @Test
public void testDeliveryCountChangedOnRollback() throws Exception
{
- try (Connection connection = getConnectionWithPrefetch(0))
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
{
Session session = connection.createSession(JMSContext.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(_queue);
@@ -78,9 +85,10 @@ public class DeliveryCountTest extends QpidBrokerTestCase
}
}
+ @Test
public void testDeliveryCountChangedOnSessionClose() throws Exception
{
- try (Connection connection = getConnectionWithPrefetch(0))
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
{
connection.start();
for (int i = 0; i < MAX_DELIVERY_ATTEMPTS; i++)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
index 38b09f0..20b36c1 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverydelay/DeliveryDelayTest.java
@@ -20,55 +20,58 @@
package org.apache.qpid.systests.jms_2_0.deliverydelay;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
-import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.naming.NamingException;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.URLSyntaxException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.qpid.systests.jms_2_0.Jms2TestBase;
+import org.apache.qpid.tests.utils.BrokerAdmin;
-public class DeliveryDelayTest extends QpidBrokerTestCase
+public class DeliveryDelayTest extends Jms2TestBase
{
private static final int DELIVERY_DELAY = 3000;
- @Override
- public void setUp() throws Exception
+ @BeforeClass
+ public static void setUpClass() throws Exception
+ {
+ System.setProperty("virtualhost.housekeepingCheckPeriod", "100");
+ }
+
+ @AfterClass
+ public static void tearDownClass()
{
- setTestSystemProperty("virtualhost.housekeepingCheckPeriod", "100");
- super.setUp();
+ System.clearProperty("virtualhost.housekeepingCheckPeriod");
}
+ @Test
public void testDeliveryDelay() throws Exception
{
- try (JMSContext context = getConnectionFactory().createContext(GUEST_USERNAME, GUEST_PASSWORD);
- Connection utilityConnection = createUtilityConnection())
+ try (JMSContext context = getConnectionBuilder().buildConnectionFactory().createContext())
{
- Destination queue = createQueue(utilityConnection, getTestQueueName(), true);
+ Destination queue = createQueue(context, BrokerAdmin.TEST_QUEUE_NAME, true);
final AtomicLong messageReceiptTime = new AtomicLong();
final CountDownLatch receivedLatch = new CountDownLatch(1);
- context.createConsumer(queue).setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(final Message message)
- {
- messageReceiptTime.set(System.currentTimeMillis());
- receivedLatch.countDown();
- }
+ context.createConsumer(queue).setMessageListener(message -> {
+ messageReceiptTime.set(System.currentTimeMillis());
+ receivedLatch.countDown();
});
JMSProducer producer = context.createProducer().setDeliveryDelay(DELIVERY_DELAY);
@@ -89,12 +92,12 @@ public class DeliveryDelayTest extends QpidBrokerTestCase
* The target queue, which is addressed directly by the client, does not have
* holdsOnPublish turned on. The Broker must reject the message.
*/
+ @Test
public void testDeliveryDelayNotSupportedByQueue_MessageRejected() throws Exception
{
- try (JMSContext context = getConnectionFactory().createContext(GUEST_USERNAME, GUEST_PASSWORD);
- Connection utilityConnection = createUtilityConnection())
+ try (JMSContext context = getConnectionBuilder().buildConnectionFactory().createContext())
{
- Destination queue = createQueue(utilityConnection, getTestQueueName(), false);
+ Destination queue = createQueue(context, BrokerAdmin.TEST_QUEUE_NAME, false);
JMSProducer producer = context.createProducer().setDeliveryDelay(DELIVERY_DELAY);
try
@@ -114,17 +117,17 @@ public class DeliveryDelayTest extends QpidBrokerTestCase
* The client sends a messagge to a fanout exchange instance which is bound to a queue with
* holdsOnPublish turned off. The Broker must reject the message.
*/
+ @Test
public void testDeliveryDelayNotSupportedByQueueViaExchange_MessageRejected() throws Exception
{
- try (JMSContext context = getConnectionFactory().createContext(GUEST_USERNAME, GUEST_PASSWORD);
- Connection utilityConnection = createUtilityConnection())
+ try (JMSContext context = getConnectionBuilder().buildConnectionFactory().createContext())
{
- String testQueueName = getTestQueueName();
- String testExchangeName = getTestName() + "_exch";
+ String testQueueName = BrokerAdmin.TEST_QUEUE_NAME;
+ String testExchangeName = "test_exch";
- Destination consumeDest = createQueue(utilityConnection, testQueueName, false);
- Destination publishDest = createExchange(utilityConnection, testExchangeName);
- bindQueueToExchange(utilityConnection, testExchangeName, testQueueName);
+ Destination consumeDest = createQueue(context, testQueueName, false);
+ Destination publishDest = createExchange(context, testExchangeName);
+ bindQueueToExchange(testExchangeName, testQueueName);
JMSConsumer consumer = context.createConsumer(consumeDest);
@@ -150,56 +153,37 @@ public class DeliveryDelayTest extends QpidBrokerTestCase
}
}
- private Destination createQueue(Connection utilityConnection, String queueName, boolean holdsOnPublish) throws Exception
+ private Destination createQueue(final JMSContext context, String queueName,
+ boolean holdsOnPublish) throws Exception
{
- try (Session session = utilityConnection.createSession(Session.SESSION_TRANSACTED))
- {
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Queue.HOLD_ON_PUBLISH_ENABLED, holdsOnPublish);
- createEntityUsingAmqpManagement(queueName,
- session,
- "org.apache.qpid.Queue",
- attributes);
- return session.createQueue(queueName);
- }
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.HOLD_ON_PUBLISH_ENABLED, holdsOnPublish);
+ createEntityUsingAmqpManagement(queueName,
+ "org.apache.qpid.Queue",
+ attributes);
+ return context.createQueue(queueName);
}
- private Destination createExchange(Connection utilityConnection, String exchangeName) throws Exception
+ private Destination createExchange(final JMSContext context, String exchangeName) throws Exception
{
- try (Session session = utilityConnection.createSession(Session.SESSION_TRANSACTED))
- {
- Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Exchange.UNROUTABLE_MESSAGE_BEHAVIOUR, "REJECT");
- createEntityUsingAmqpManagement(exchangeName,
- session,
- "org.apache.qpid.FanoutExchange",
- attributes);
- return session.createQueue(exchangeName);
- }
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Exchange.UNROUTABLE_MESSAGE_BEHAVIOUR, "REJECT");
+ createEntityUsingAmqpManagement(exchangeName,
+ "org.apache.qpid.FanoutExchange",
+ attributes);
+ return context.createQueue(exchangeName);
}
- private void bindQueueToExchange(Connection utilityConnection,
- String exchangeName,
+ private void bindQueueToExchange(String exchangeName,
String queueName) throws Exception
{
- try (Session session = utilityConnection.createSession(Session.SESSION_TRANSACTED))
- {
- final Map<String, Object> arguments = new HashMap<>();
- arguments.put("destination", queueName);
- arguments.put("bindingKey", queueName);
- performOperationUsingAmqpManagement(exchangeName,
- "bind",
- session,
- "org.apache.qpid.FanoutExchange",
- arguments);
- }
- }
-
- private Connection createUtilityConnection() throws JMSException, NamingException, URLSyntaxException
- {
- Connection connection = getConnectionBuilder().build();
- connection.start();
- return connection;
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination", queueName);
+ arguments.put("bindingKey", queueName);
+ performOperationUsingAmqpManagement(exchangeName,
+ "bind",
+ "org.apache.qpid.FanoutExchange",
+ arguments);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index 8842ebe..7de96b0 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -19,141 +19,139 @@
package org.apache.qpid.systests.jms_2_0.subscription;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-import org.apache.qpid.systest.rest.RestTestHelper;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-
-public class SharedSubscriptionTest extends QpidBrokerTestCase
-{
-
- private RestTestHelper _restTestHelper;
+import org.junit.Test;
- @Override
- public void setUp() throws Exception
- {
- TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
- brokerConfiguration.addHttpManagementConfiguration();
- super.setUp();
- _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
- }
+import org.apache.qpid.systests.jms_2_0.Jms2TestBase;
+import org.apache.qpid.systests.jms_2_0.Utils;
+public class SharedSubscriptionTest extends Jms2TestBase
+{
+ @Test
public void testSharedNonDurableSubscription() throws Exception
{
- Connection connection = getConnectionWithPrefetch(0);
-
- Session publishingSession = connection.createSession();
- Session subscriber1Session = connection.createSession();
- Session subscriber2Session = connection.createSession();
-
- String topicName = getTestName();
- Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
+ {
+ Session publishingSession = connection.createSession();
+ Session subscriber1Session = connection.createSession();
+ Session subscriber2Session = connection.createSession();
- MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "subscription");
- MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "subscription");
+ String topicName = getTestName();
+ Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
- MessageProducer producer = publishingSession.createProducer(topic);
+ MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "subscription");
+ MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "subscription");
- sendMessage(publishingSession, topic, 2);
+ Utils.sendMessage(publishingSession, topic, 2);
- connection.start();
+ connection.start();
- Message message1 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message2 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message1 = consumer1.receive(getReceiveTimeout());
+ Message message2 = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message 1 was not received", message1);
- assertNotNull("Message 2 was not received", message2);
+ assertNotNull("Message 1 was not received", message1);
+ assertNotNull("Message 2 was not received", message2);
- assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(INDEX));
- assertEquals("Unexpected index for message 2", 1, message2.getIntProperty(INDEX));
+ assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
+ assertEquals("Unexpected index for message 2", 1, message2.getIntProperty(Utils.INDEX));
- Message message3 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message4 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message3 = consumer1.receive(getReceiveTimeout());
+ Message message4 = consumer2.receive(getReceiveTimeout());
- assertNull("Unexpected message received by first shared consumer", message3);
- assertNull("Unexpected message received by second shared consumer", message4);
+ assertNull("Unexpected message received by first shared consumer", message3);
+ assertNull("Unexpected message received by second shared consumer", message4);
+ }
}
+ @Test
public void testSharedDurableSubscription() throws Exception
{
- Connection connection = getConnectionBuilder().setPrefetch(0).setClientId("myClientId").build();
-
- Session publishingSession = connection.createSession();
- Session subscriber1Session = connection.createSession();
- Session subscriber2Session = connection.createSession();
-
String topicName = getTestName();
- Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).setClientId("myClientId").build())
+ {
- MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
- MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
+ Session publishingSession = connection.createSession();
+ Session subscriber1Session = connection.createSession();
+ Session subscriber2Session = connection.createSession();
- MessageProducer producer = publishingSession.createProducer(topic);
+ Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
- sendMessage(publishingSession, topic, 4);
+ MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
+ MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
- connection.start();
+ Utils.sendMessage(publishingSession, topic, 4);
- Message message1 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message2 = consumer2.receive(RECEIVE_TIMEOUT);
+ connection.start();
- assertNotNull("Message 1 was not received", message1);
- assertNotNull("Message 2 was not received", message2);
+ Message message1 = consumer1.receive(getReceiveTimeout());
+ Message message2 = consumer2.receive(getReceiveTimeout());
- assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(INDEX));
- assertEquals("Unexpected index for message 2", 1, message2.getIntProperty(INDEX));
+ assertNotNull("Message 1 was not received", message1);
+ assertNotNull("Message 2 was not received", message2);
- connection.close();
+ assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
+ assertEquals("Unexpected index for message 2", 1, message2.getIntProperty(Utils.INDEX));
- if (isBrokerStorePersistent())
- {
- restartDefaultBroker();
+ connection.close();
}
- connection = getConnectionBuilder().setPrefetch(0).setClientId("myClientId").build();
- subscriber1Session = connection.createSession();
- subscriber2Session = connection.createSession();
+ if (getBrokerAdmin().supportsRestart())
+ {
+ getBrokerAdmin().restart();
+ }
- consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
- consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).setClientId("myClientId").build())
+ {
+ Session subscriber1Session = connection.createSession();
+ Session subscriber2Session = connection.createSession();
+ Topic topic = subscriber1Session.createTopic("amq.direct/" + topicName);
+ MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
+ MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
- connection.start();
+ connection.start();
- Message message3 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message4 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message3 = consumer1.receive(getReceiveTimeout());
+ Message message4 = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message 3 was not received", message3);
- assertNotNull("Message 4 was not received", message4);
+ assertNotNull("Message 3 was not received", message3);
+ assertNotNull("Message 4 was not received", message4);
- assertEquals("Unexpected index for message 3", 2, message3.getIntProperty(INDEX));
- assertEquals("Unexpected index for message 4", 3, message4.getIntProperty(INDEX));
+ assertEquals("Unexpected index for message 3", 2, message3.getIntProperty(Utils.INDEX));
+ assertEquals("Unexpected index for message 4", 3, message4.getIntProperty(Utils.INDEX));
- Message message5 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message6 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message5 = consumer1.receive(getReceiveTimeout());
+ Message message6 = consumer2.receive(getReceiveTimeout());
- assertNull("Unexpected message received by first shared consumer", message5);
- assertNull("Unexpected message received by second shared consumer", message6);
+ assertNull("Unexpected message received by first shared consumer", message5);
+ assertNull("Unexpected message received by second shared consumer", message6);
+ connection.close();
+ }
}
+ @Test
public void testUnsubscribe() throws Exception
{
sharedDurableSubscriptionUnsubscribeTest("myClientId");
}
+ @Test
public void testUnsubscribeForGlobalSharedDurableSubscription() throws Exception
{
sharedDurableSubscriptionUnsubscribeTest(null);
@@ -161,162 +159,187 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase
private void sharedDurableSubscriptionUnsubscribeTest(final String clientId) throws Exception
{
- Connection connection = getConnectionBuilder().setPrefetch(0).setClientId(clientId).build();
- Session session = connection.createSession();
+ String subscriptionName = "testSharedSubscription";
+ int numberOfQueuesBeforeTest = getQueueCount();
+ String topicName = getTestName();
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).setClientId(clientId).build())
+ {
+ Session session = connection.createSession();
- connection.start();
+ connection.start();
- String topicName = getTestName();
- Topic topic = session.createTopic("amq.direct/" + topicName);
- String subscriptionName = "testSharedSubscription";
- MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+ Topic topic = session.createTopic("amq.direct/" + topicName);
+ MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
- Map<String, Object>
- statistics = _restTestHelper.getJsonAsMap("virtualhost/test/test/getStatistics?statistics=[\"queueCount\"]");
- int numberOfQueuesBeforeUnsubscribe = (int) statistics.get("queueCount");
- assertEquals("Unexpected number of Queues", 1, numberOfQueuesBeforeUnsubscribe);
+ int numberOfQueuesBeforeUnsubscribe = getQueueCount();
+ assertEquals("Unexpected number of Queues", numberOfQueuesBeforeTest + 1, numberOfQueuesBeforeUnsubscribe);
- consumer.close();
- session.close();
- connection.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ }
- if (isBrokerStorePersistent())
+ if (getBrokerAdmin().supportsRestart())
{
- restartDefaultBroker();
+ getBrokerAdmin().restart();
}
- connection = getConnectionBuilder().setPrefetch(0).setClientId(clientId).build();
- session = connection.createSession();
- session.unsubscribe(subscriptionName);
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).setClientId(clientId).build())
+ {
+ final Session session = connection.createSession();
+ session.unsubscribe(subscriptionName);
- statistics = _restTestHelper.getJsonAsMap("virtualhost/test/test/getStatistics?statistics=[\"queueCount\"]");
- int numberOfQueuesAfterUnsubscribe = (int) statistics.get("queueCount");
- assertEquals("Queue should be deleted", 0, numberOfQueuesAfterUnsubscribe);
+ int numberOfQueuesAfterUnsubscribe = getQueueCount();
+ assertEquals("Queue should be deleted", numberOfQueuesBeforeTest, numberOfQueuesAfterUnsubscribe);
+ }
}
- public void testDurableSharedAndNonDurableSharedCanUseTheSameSubscriptionName() throws Exception
+ private int getQueueCount() throws Exception
{
- Connection connection = getConnectionWithPrefetch(0);
+ Map<String, Object> arguments = Collections.singletonMap("statistics",
+ Collections.singletonList("queueCount"));
+ Object statistics = performOperationUsingAmqpManagement(getVirtualHostName(),
+ "getStatistics",
+ "org.apache.qpid.VirtualHost",
+ arguments);
+
+ assertNotNull("Statistics is null", statistics);
+ assertTrue("Statistics is not map", statistics instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+ return ((Number) statisticsMap.get("queueCount")).intValue();
+ }
- Session publishingSession = connection.createSession();
- Session subscriberSession = connection.createSession();
+ @Test
+ public void testDurableSharedAndNonDurableSharedCanUseTheSameSubscriptionName() throws Exception
+ {
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
+ {
+ Session publishingSession = connection.createSession();
+ Session subscriberSession = connection.createSession();
- String topicName = getTestName();
- Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
- MessageConsumer consumer1 = subscriberSession.createSharedDurableConsumer(topic, "testSharedSubscription");
- MessageConsumer consumer2 = subscriberSession.createSharedConsumer(topic, "testSharedSubscription");
- connection.start();
+ String topicName = getTestName();
+ Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
+ MessageConsumer consumer1 = subscriberSession.createSharedDurableConsumer(topic, "testSharedSubscription");
+ MessageConsumer consumer2 = subscriberSession.createSharedConsumer(topic, "testSharedSubscription");
+ connection.start();
- sendMessage(publishingSession, topic, 1);
+ Utils.sendMessage(publishingSession, topic, 1);
- Message message1 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message2 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message1 = consumer1.receive(getReceiveTimeout());
+ Message message2 = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message 1 was not received", message1);
- assertNotNull("Message 2 was not received", message2);
+ assertNotNull("Message 1 was not received", message1);
+ assertNotNull("Message 2 was not received", message2);
- assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(INDEX));
- assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(INDEX));
+ assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
+ assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(Utils.INDEX));
+ }
}
+ @Test
public void testGlobalAndNotGlobalCanUseTheSameSubscriptionName() throws Exception
{
- Connection connection = getClientConnection(GUEST_USERNAME, GUEST_PASSWORD, "testClientId");
- Connection connection2 = getClientConnection(GUEST_USERNAME, GUEST_PASSWORD, null);
-
- Session publishingSession = connection.createSession();
- Session subscriber1Session = connection.createSession();
- Session subscriber2Session = connection2.createSession();
+ try (Connection connection = getConnectionBuilder().setClientId("testClientId").build();
+ Connection connection2 = getConnectionBuilder().setClientId(null).build())
+ {
+ Session publishingSession = connection.createSession();
+ Session subscriber1Session = connection.createSession();
+ Session subscriber2Session = connection2.createSession();
- String topicName = getTestName();
- Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
- MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "testSharedSubscription");
- MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "testSharedSubscription");
- connection.start();
- connection2.start();
+ String topicName = getTestName();
+ Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
+ MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "testSharedSubscription");
+ MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "testSharedSubscription");
+ connection.start();
+ connection2.start();
- sendMessage(publishingSession, topic, 1);
+ Utils.sendMessage(publishingSession, topic, 1);
- Message message1 = consumer1.receive(RECEIVE_TIMEOUT);
- Message message2 = consumer2.receive(RECEIVE_TIMEOUT);
+ Message message1 = consumer1.receive(getReceiveTimeout());
+ Message message2 = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message 1 was not received", message1);
- assertNotNull("Message 2 was not received", message2);
+ assertNotNull("Message 1 was not received", message1);
+ assertNotNull("Message 2 was not received", message2);
- assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(INDEX));
- assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(INDEX));
+ assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
+ assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(Utils.INDEX));
+ }
}
+ @Test
public void testTopicOrSelectorChange() throws Exception
{
- final Map<String, String> options = new HashMap<>();
- options.put("jms.prefetchPolicy.all", "0");
- options.put("jms.clientID", null);
- Connection connection = getConnectionWithOptions(options);
- Connection connection2 = getConnectionWithOptions(options);
-
- Session publishingSession = connection.createSession();
- Session subscriber1Session = connection.createSession();
- Session subscriber2Session = connection2.createSession();
-
- String topicName = getTestName();
- Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
-
- MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription", "index>1");
-
- MessageProducer producer = publishingSession.createProducer(topic);
-
- sendMessage(publishingSession, topic, 4);
-
- connection.start();
- connection2.start();
-
- Message message1 = consumer1.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message 1 was not received", message1);
- assertEquals("Unexpected index for message 1", 2, message1.getIntProperty(INDEX));
-
- try
- {
- subscriber2Session.createSharedDurableConsumer(topic, "subscription", "index>2");
- fail("Consumer should not be allowed to join shared subscription with different filter when there is an active subscriber");
- }
- catch (JMSException e)
- {
- // pass
- }
- Topic topic2 = publishingSession.createTopic("amq.direct/" + topicName + "2");
- try
- {
- subscriber2Session.createSharedDurableConsumer(topic2, "subscription", "index>1");
- fail("Consumer should not be allowed to join shared subscription with different topic when there is an active subscriber");
- }
- catch (JMSException e)
+ try (Connection connection = getConnectionBuilder().setPrefetch(0).setClientId(null).build();
+ Connection connection2 = getConnectionBuilder().setPrefetch(0).setClientId(null).build())
{
- // pass
+ Session publishingSession = connection.createSession();
+ Session subscriber1Session = connection.createSession();
+ Session subscriber2Session = connection2.createSession();
+
+ String topicName = getTestName();
+ Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
+
+ MessageConsumer consumer1 =
+ subscriber1Session.createSharedDurableConsumer(topic, "subscription", "index>1");
+
+ Utils.sendMessage(publishingSession, topic, 4);
+
+ connection.start();
+ connection2.start();
+
+ Message message1 = consumer1.receive(getReceiveTimeout());
+ assertNotNull("Message 1 was not received", message1);
+ assertEquals("Unexpected index for message 1", 2, message1.getIntProperty(Utils.INDEX));
+
+ try
+ {
+ subscriber2Session.createSharedDurableConsumer(topic, "subscription", "index>2");
+ fail("Consumer should not be allowed to join shared subscription with different filter when there is an active subscriber");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+ Topic topic2 = publishingSession.createTopic("amq.direct/" + topicName + "2");
+ try
+ {
+ subscriber2Session.createSharedDurableConsumer(topic2, "subscription", "index>1");
+ fail("Consumer should not be allowed to join shared subscription with different topic when there is an active subscriber");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+ consumer1.close();
+ MessageConsumer consumer2 =
+ subscriber2Session.createSharedDurableConsumer(topic, "subscription", "index>2");
+
+ Message message2 = consumer2.receive(getReceiveTimeout());
+ assertNull(
+ "No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
+ message2);
+
+ Utils.sendMessage(publishingSession, topic, 4);
+
+ Message message3 = consumer2.receive(getReceiveTimeout());
+ assertNotNull("Should receive message 3", message3);
+ assertEquals("Unexpected index for message 3", 3, message3.getIntProperty(Utils.INDEX));
+ consumer2.close();
+
+ MessageConsumer consumer3 =
+ subscriber2Session.createSharedDurableConsumer(topic2, "subscription", "index>2");
+ Message message4 = consumer3.receive(getReceiveTimeout());
+
+ assertNull(
+ "No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
+ message4);
+
+ Utils.sendMessage(publishingSession, topic2, 4);
+
+ Message message5 = consumer3.receive(getReceiveTimeout());
+ assertEquals("Unexpected index for message 5", 3, message5.getIntProperty(Utils.INDEX));
}
- consumer1.close();
- MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription", "index>2");
-
- Message message2 = consumer2.receive(RECEIVE_TIMEOUT);
- assertNull("No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe", message2);
-
- sendMessage(publishingSession, topic, 4);
-
- Message message3 = consumer2.receive(RECEIVE_TIMEOUT);
- assertNotNull("Should receive message 3", message3);
- assertEquals("Unexpected index for message 3", 3, message3.getIntProperty(INDEX));
- consumer2.close();
-
- MessageConsumer consumer3 = subscriber2Session.createSharedDurableConsumer(topic2, "subscription", "index>2");
- Message message4 = consumer3.receive(RECEIVE_TIMEOUT);
-
- assertNull("No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe", message4);
-
- sendMessage(publishingSession, topic2, 4);
-
- Message message5 = consumer3.receive(RECEIVE_TIMEOUT);
- assertEquals("Unexpected index for message 5", 3, message5.getIntProperty(INDEX));
-
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index 041604a..fceba26 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -43,11 +43,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class AmqpManagementFacade
{
- private final QpidBrokerTestCase _qpidBrokerTestCase;
+ private final String _managementAddress;
- public AmqpManagementFacade(QpidBrokerTestCase _qpidBrokerTestCase)
+ public AmqpManagementFacade(final String managementAddress)
{
- this._qpidBrokerTestCase = _qpidBrokerTestCase;
+ _managementAddress = managementAddress;
}
public void createEntityUsingAmqpManagement(final String name, final Session session, final String type)
@@ -62,9 +62,7 @@ public class AmqpManagementFacade
Map<String, Object> attributes)
throws JMSException
{
- MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
- ? "$management"
- : "ADDR:$management"));
+ MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
@@ -89,9 +87,7 @@ public class AmqpManagementFacade
Map<String, Object> attributes)
throws JMSException
{
- MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
- ? "$management"
- : "ADDR:$management"));
+ MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
@@ -113,9 +109,7 @@ public class AmqpManagementFacade
public void deleteEntityUsingAmqpManagement(final String name, final Session session, final String type)
throws JMSException
{
- MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
- ? "$management"
- : "ADDR:$management"));
+ MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
@@ -137,9 +131,7 @@ public class AmqpManagementFacade
Map<String, Object> arguments)
throws JMSException
{
- MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
- ? "$management"
- : "ADDR:$management"));
+ MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
final TemporaryQueue responseQ = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(responseQ);
MapMessage opMessage = session.createMapMessage();
@@ -275,9 +267,7 @@ public class AmqpManagementFacade
final String name,
final boolean actuals) throws JMSException
{
- MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
- ? "$management"
- : "ADDR:$management"));
+ MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
final TemporaryQueue responseQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(responseQueue);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
index 8390614..afbdba8 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.utils;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.NamingException;
@@ -28,6 +29,9 @@ import org.apache.qpid.url.URLSyntaxException;
public interface ConnectionBuilder
{
+ ConnectionBuilder setHost(String host);
+ ConnectionBuilder setPort(int port);
+ ConnectionBuilder setSslPort(int port);
ConnectionBuilder setPrefetch(int prefetch);
ConnectionBuilder setClientId(String clientId);
ConnectionBuilder setUsername(String username);
@@ -39,5 +43,5 @@ public interface ConnectionBuilder
ConnectionBuilder setSyncPublish(boolean syncPublish);
Connection build() throws NamingException, JMSException, URLSyntaxException;
-
+ ConnectionFactory buildConnectionFactory() throws NamingException, URLSyntaxException;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 455b864..cda5895 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -75,7 +75,7 @@ public class QpidBrokerTestCase extends QpidTestCase
private final Map<String, String> _propertiesSetForBroker = new HashMap<>();
private final List<Connection> _connections = new ArrayList<>();
- private final AmqpManagementFacade _managementFacade = new AmqpManagementFacade(this);
+ private AmqpManagementFacade _managementFacade;
private BrokerHolder _defaultBroker;
private MessageType _messageType = MessageType.TEXT;
private JmsProvider _jmsProvider;
@@ -85,6 +85,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
try
{
+ _managementFacade = new AmqpManagementFacade(isBroker10() ? "$management" : "ADDR:$management");
_jmsProvider = isBroker10() ? new QpidJmsClientProvider(_managementFacade) : new QpidJmsClient0xProvider(_managementFacade);
_defaultBroker = new BrokerHolderFactory().create(DEFAULT_BROKER_TYPE, DEFAULT_PORT, this);
@@ -223,7 +224,7 @@ public class QpidBrokerTestCase extends QpidTestCase
public ConnectionBuilder getConnectionBuilder()
{
- return _jmsProvider.getConnectionBuilder();
+ return _jmsProvider.getConnectionBuilder().setVirtualHost("test");
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
index 5beaed8..0c56f01 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
@@ -46,6 +46,30 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
private boolean _enableFailover;
private final Map<String, Object> _options = new TreeMap<>();
private int _reconnectAttempts = 20;
+ private String _host;
+ private int _port;
+ private int _sslPort;
+
+ @Override
+ public ConnectionBuilder setHost(final String host)
+ {
+ _host = host;
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setPort(final int port)
+ {
+ _port = port;
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setSslPort(final int port)
+ {
+ _sslPort = port;
+ return this;
+ }
@Override
public ConnectionBuilder setPrefetch(final int prefetch)
@@ -120,6 +144,12 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
@Override
public Connection build() throws JMSException, NamingException, URLSyntaxException
{
+ return buildConnectionFactory().createConnection(_username, _password);
+ }
+
+ @Override
+ public ConnectionFactory buildConnectionFactory() throws NamingException, URLSyntaxException
+ {
Properties contextProperties = new Properties();
contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
contextProperties.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL));
@@ -170,8 +200,6 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
curl = new AMQConnectionURL(curl.toString());
connectionFactory = new AMQConnectionFactory(curl);
-
-
- return connectionFactory.createConnection(_username, _password);
+ return connectionFactory;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
index fe0761c..459c497 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
@@ -37,8 +37,9 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
private static final AtomicInteger CLIENTID_COUNTER = new AtomicInteger();
- private String _username;
- private String _password;
+ private String _host;
+ private int _port;
+ private int _sslPort;
private Map<String, Object> _options;
private boolean _enableTls;
private boolean _enableFailover;
@@ -47,9 +48,32 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
_options = new TreeMap<>();
_options.put("jms.clientID", getNextClientId());
- _options.put("amqp.vhost", "test");
- _username = "guest";
- _password = "guest";
+ _options.put("jms.username", "guest");
+ _options.put("jms.password", "guest");
+ _port = Integer.getInteger("test.port");
+ _sslPort = Integer.getInteger("test.port.ssl");
+ _host = "localhost";
+ }
+
+ @Override
+ public ConnectionBuilder setHost(final String host)
+ {
+ _host = host;
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setPort(final int port)
+ {
+ _port = port;
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setSslPort(final int port)
+ {
+ _sslPort = port;
+ return this;
}
@Override
@@ -76,14 +100,28 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
@Override
public ConnectionBuilder setUsername(final String username)
{
- _username = username;
+ if (username == null)
+ {
+ _options.remove("jms.username");
+ }
+ else
+ {
+ _options.put("jms.username", username);
+ }
return this;
}
@Override
public ConnectionBuilder setPassword(final String password)
{
- _password = password;
+ if (password == null)
+ {
+ _options.remove("jms.password");
+ }
+ else
+ {
+ _options.put("jms.password", password);
+ }
return this;
}
@@ -125,6 +163,12 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
@Override
public Connection build() throws NamingException, JMSException
{
+ return buildConnectionFactory().createConnection();
+ }
+
+ @Override
+ public ConnectionFactory buildConnectionFactory() throws NamingException
+ {
final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
final String factoryName;
@@ -136,8 +180,10 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
options.put("failover.maxReconnectAttempts", "2");
}
- final StringBuilder stem = new StringBuilder("failover:(amqp://localhost:")
- .append(System.getProperty("test.port"))
+ final StringBuilder stem = new StringBuilder("failover:(amqp://")
+ .append(_host)
+ .append(":")
+ .append(_port)
.append(",amqp://localhost:")
.append(System.getProperty("test.port.alt"))
.append(")");
@@ -150,7 +196,7 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
else if (!_enableTls)
{
final StringBuilder stem =
- new StringBuilder("amqp://localhost:").append(System.getProperty("test.port"));
+ new StringBuilder("amqp://").append(_host).append(":").append(_port);
appendOptions(options, stem);
@@ -159,20 +205,14 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
}
else
{
-
- final StringBuilder stem = new StringBuilder("amqps://localhost:").append(String.valueOf(System.getProperty("test.port.ssl")));
+ final StringBuilder stem = new StringBuilder("amqps://").append(_host).append(":").append(_sslPort);
appendOptions(options, stem);
initialContextEnvironment.put("connectionfactory.default.ssl", stem.toString());
factoryName = "default.ssl";
}
- final ConnectionFactory connectionFactory =
- (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
-
- return connectionFactory.createConnection(_username, _password);
+ return (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
}
-
-
private void appendOptions(final Map<String, Object> actualOptions, final StringBuilder stem)
{
boolean first = true;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6dc32330/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 68f0ab4..f171f4d 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -193,7 +193,7 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
{
throw new IllegalStateException(String.format("Could not find port with name '%s' on the Broker", portType.name()));
}
- return new InetSocketAddress(port);
+ return InetSocketAddress.createUnresolved("localhost", port);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/7] qpid-broker-j git commit: QPID-6933: [System Tests] Remove jndi
client settings from profiles
Posted by or...@apache.org.
QPID-6933: [System Tests] Remove jndi client settings from profiles
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7336b520
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7336b520
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7336b520
Branch: refs/heads/master
Commit: 7336b5203f5b190d0671473d36ea060cdf3fd165
Parents: c47109b
Author: Alex Rudyy <or...@apache.org>
Authored: Sun Nov 26 20:08:42 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
pom.xml | 12 ------------
systests/qpid-systests-jms_2.0/pom.xml | 11 -----------
2 files changed, 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7336b520/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d2e3087..3fe6f46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,8 +111,6 @@
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
<profile.broker.clean.between.tests>true</profile.broker.clean.between.tests>
<profile.test_receive_timeout>1000</profile.test_receive_timeout>
- <profile.java.naming.factory.initial>org.apache.qpid.jndi.PropertiesFileInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider.properties</profile.java.naming.provider.url>
<profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
<dollar.sign>$</dollar.sign>
@@ -810,8 +808,6 @@
<broker.clean.between.tests>true</broker.clean.between.tests>
<qpid.test_receive_timeout>${profile.test_receive_timeout}</qpid.test_receive_timeout>
<qpid.tests.mms.messagestore.persistence>${profile.qpid.tests.mms.messagestore.persistence}</qpid.tests.mms.messagestore.persistence>
- <java.naming.factory.initial>${profile.java.naming.factory.initial}</java.naming.factory.initial>
- <java.naming.provider.url>${profile.java.naming.provider.url}</java.naming.provider.url>
<java.io.tmpdir>${java.io.tmpdir}</java.io.tmpdir>
</systemPropertyVariables>
</configuration>
@@ -1247,8 +1243,6 @@
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
<profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
</properties>
</profile>
@@ -1269,8 +1263,6 @@
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
</properties>
</profile>
@@ -1290,8 +1282,6 @@
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
</properties>
</profile>
@@ -1316,8 +1306,6 @@
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
</properties>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7336b520/systests/qpid-systests-jms_2.0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/pom.xml b/systests/qpid-systests-jms_2.0/pom.xml
index 9db61d4..b293574 100644
--- a/systests/qpid-systests-jms_2.0/pom.xml
+++ b/systests/qpid-systests-jms_2.0/pom.xml
@@ -28,17 +28,6 @@
<name>Apache Qpid Broker-J JMS 2.0 System Tests</name>
<description>JMS 2.0 system tests</description>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <test.working.directory>${basedir}/../..</test.working.directory>
- <test.resource.directory>${basedir}/../..</test.resource.directory>
- <test.systest.resource.directory>${basedir}/../../systests</test.systest.resource.directory>
- <profile.broker.version>v1_0</profile.broker.version>
- <profile.test.amqp_port_protocols>["AMQP_1_0"]</profile.test.amqp_port_protocols>
- <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
- <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/7] qpid-broker-j git commit: QPID-6933: [System Tests] Remove
compile time dependencies to legacy JMS client classes from JMSProvider
implementations and reduce the amount of methods to create connection
Posted by or...@apache.org.
QPID-6933: [System Tests] Remove compile time dependencies to legacy JMS client classes from JMSProvider implementations and reduce the amount of methods to create connection
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/9089861a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/9089861a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/9089861a
Branch: refs/heads/master
Commit: 9089861a47551cb6caee363c961655e94c7e97fd
Parents: 6dc3233
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 27 23:21:25 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
.../qpid/test/utils/AmqpManagementFacade.java | 23 +-
.../qpid/test/utils/ConnectionBuilder.java | 10 +-
.../org/apache/qpid/test/utils/JmsProvider.java | 38 +--
.../qpid/test/utils/QpidBrokerTestCase.java | 96 +++---
.../utils/QpidJmsClient0xConnectionBuilder.java | 128 +++++---
.../test/utils/QpidJmsClient0xProvider.java | 301 +++++++------------
.../utils/QpidJmsClientConnectionBuilder.java | 46 +--
.../qpid/test/utils/QpidJmsClientProvider.java | 295 +++---------------
.../client/failover/FailoverBehaviourTest.java | 4 +-
.../ObjectMessageClassWhitelistingTest.java | 4 +-
.../org/apache/qpid/client/ssl/SSLTest.java | 8 +-
.../qpid/server/AbruptClientDisconnectTest.java | 2 +-
.../message/MessageProtocolConversionTest.java | 9 +-
.../manager/ExternalAuthenticationTest.java | 2 +-
.../qpid/systest/MessageCompressionTest.java | 16 +-
.../management/amqp/AmqpManagementTest.java | 10 +-
.../AddressBasedDestinationTest.java | 2 +-
.../apache/qpid/test/unit/message/UTF8Test.java | 46 ++-
.../qpid/test/utils/FailoverBaseCase.java | 16 +-
test-profiles/Java10UninvestigatedTestsExcludes | 3 +
20 files changed, 416 insertions(+), 643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index fceba26..e7408ba 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -174,6 +174,11 @@ public class AmqpManagementFacade
Message response = consumer.receive(5000);
try
{
+ int statusCode = response.getIntProperty("statusCode");
+ if (statusCode < 200 || statusCode > 299)
+ {
+ throw new OperationUnsuccessfulException(statusCode);
+ }
if (response instanceof MapMessage)
{
MapMessage bodyMap = (MapMessage) response;
@@ -339,4 +344,20 @@ public class AmqpManagementFacade
}
return results;
}
-}
\ No newline at end of file
+
+ public static class OperationUnsuccessfulException extends RuntimeException
+ {
+ private final int _statusCode;
+
+ private OperationUnsuccessfulException(final int statusCode)
+ {
+ super();
+ _statusCode = statusCode;
+ }
+
+ public int getStatusCode()
+ {
+ return _statusCode;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
index afbdba8..70fd7fb 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/ConnectionBuilder.java
@@ -20,13 +20,13 @@
package org.apache.qpid.test.utils;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.NamingException;
-import org.apache.qpid.url.URLSyntaxException;
-
public interface ConnectionBuilder
{
ConnectionBuilder setHost(String host);
@@ -41,7 +41,9 @@ public interface ConnectionBuilder
ConnectionBuilder setFailoverReconnectAttempts(int reconnectAttempts);
ConnectionBuilder setTls(boolean enableTls);
ConnectionBuilder setSyncPublish(boolean syncPublish);
+ ConnectionBuilder setOptions(Map<String, String> options);
+ ConnectionBuilder setPopulateJMSXUserID(boolean populateJMSXUserID);
- Connection build() throws NamingException, JMSException, URLSyntaxException;
- ConnectionFactory buildConnectionFactory() throws NamingException, URLSyntaxException;
+ Connection build() throws NamingException, JMSException;
+ ConnectionFactory buildConnectionFactory() throws NamingException;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
index 6e21325..1dbd289 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/JmsProvider.java
@@ -29,50 +29,18 @@ import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.naming.InitialContext;
import javax.naming.NamingException;
public interface JmsProvider
{
-
ConnectionFactory getConnectionFactory() throws NamingException;
ConnectionFactory getConnectionFactory(Map<String, String> options) throws NamingException;
- ConnectionFactory getConnectionFactory(String factoryName) throws NamingException;
-
- ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId) throws NamingException;
-
- ConnectionFactory getConnectionFactory(String factoryName,
- String vhost,
- String clientId,
- Map<String, String> options)
- throws NamingException;
-
- Connection getConnection() throws JMSException, NamingException;
-
- Connection getConnectionWithPrefetch(int prefetch) throws Exception;
-
- Connection getConnectionWithOptions(Map<String, String> options) throws Exception;
-
- Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception;
-
- Connection getConnectionForVHost(String vhost)
- throws Exception;
-
- Connection getConnectionForVHost(String vhost, String username, String password)
- throws Exception;
-
Connection getConnection(String urlString) throws Exception;
- Connection getConnection(String username, String password) throws JMSException, NamingException;
-
- Connection getConnectionWithSyncPublishing() throws Exception;
-
- Connection getClientConnection(String username, String password, String id) throws Exception;
-
- Queue getTestQueue(String testQueueName);
+ Queue getTestQueue(String testQueueName) throws NamingException;
Queue getQueueFromName(Session session, String name) throws JMSException;
@@ -86,9 +54,9 @@ public interface JmsProvider
Topic createTopicOnFanout(Connection con, String topicName) throws JMSException, URISyntaxException;
- long getQueueDepth(Connection con, Queue destination) throws Exception;
+ long getQueueDepth(Queue destination) throws Exception;
- boolean isQueueExist(Connection con, Queue destination) throws Exception;
+ boolean isQueueExist(Queue destination) throws Exception;
String getBrokerDetailsFromDefaultConnectionUrl();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index cda5895..6a9b6a0 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -19,6 +19,9 @@ package org.apache.qpid.test.utils;
import java.io.File;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -224,7 +227,16 @@ public class QpidBrokerTestCase extends QpidTestCase
public ConnectionBuilder getConnectionBuilder()
{
- return _jmsProvider.getConnectionBuilder().setVirtualHost("test");
+ final ConnectionBuilder connectionBuilder = _jmsProvider.getConnectionBuilder()
+ .setVirtualHost("test")
+ .setTls(Boolean.getBoolean(PROFILE_USE_SSL))
+ .setPopulateJMSXUserID(true)
+ .setUsername(GUEST_USERNAME)
+ .setPassword(GUEST_PASSWORD);
+
+ return (ConnectionBuilder) Proxy.newProxyInstance(getClass().getClassLoader(),
+ new Class<?>[]{ConnectionBuilder.class},
+ new ConectionBuilderHandler(connectionBuilder, _connections));
}
/**
@@ -244,59 +256,36 @@ public class QpidBrokerTestCase extends QpidTestCase
return _jmsProvider.getConnectionFactory(options);
}
- public ConnectionFactory getConnectionFactory(String factoryName)
- throws NamingException
- {
- return _jmsProvider.getConnectionFactory(factoryName);
- }
-
- public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId)
- throws NamingException
- {
- return _jmsProvider.getConnectionFactory(factoryName, vhost, clientId);
- }
-
public Connection getConnection() throws JMSException, NamingException
{
- Connection connection = _jmsProvider.getConnection();
- _connections.add(connection);
- return connection;
+ return getConnection(GUEST_USERNAME, GUEST_PASSWORD);
}
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
- Connection connection = _jmsProvider.getConnection(username, password);
- _connections.add(connection);
- return connection;
+ return getConnectionBuilder().setUsername(username).setPassword(password).build();
}
public Connection getConnectionWithPrefetch(int prefetch) throws Exception
{
- Connection connection = _jmsProvider.getConnectionWithPrefetch(prefetch);
- _connections.add(connection);
- return connection;
+ return getConnectionBuilder().setPrefetch(prefetch).build();
}
public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
{
- Connection connection = _jmsProvider.getConnectionWithOptions(options);
- _connections.add(connection);
- return connection;
+ return getConnectionBuilder().setOptions(options).build();
}
public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
{
-
- Connection connection = _jmsProvider.getConnectionWithOptions(vhost, options);
- _connections.add(connection);
- return connection;
+ return getConnectionBuilder().setOptions(options)
+ .setVirtualHost(vhost)
+ .build();
}
public Connection getConnectionForVHost(String vhost) throws Exception
{
- Connection connection = _jmsProvider.getConnectionForVHost(vhost);
- _connections.add(connection);
- return connection;
+ return getConnectionBuilder().setVirtualHost(vhost).build();
}
public Connection getConnection(String urlString) throws Exception
@@ -306,7 +295,7 @@ public class QpidBrokerTestCase extends QpidTestCase
return connection;
}
- public Queue getTestQueue()
+ public Queue getTestQueue() throws NamingException
{
return _jmsProvider.getTestQueue(getTestQueueName());
}
@@ -395,12 +384,12 @@ public class QpidBrokerTestCase extends QpidTestCase
public long getQueueDepth(final Connection con, final Queue destination) throws Exception
{
- return _jmsProvider.getQueueDepth(con, destination);
+ return _jmsProvider.getQueueDepth(destination);
}
public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
{
- return _jmsProvider.isQueueExist(con, destination);
+ return _jmsProvider.isQueueExist(destination);
}
/**
@@ -742,14 +731,13 @@ public class QpidBrokerTestCase extends QpidTestCase
protected Connection getConnectionWithSyncPublishing() throws Exception
{
- return _jmsProvider.getConnectionWithSyncPublishing();
+ return getConnectionBuilder().setSyncPublish(true).build();
}
protected Connection getClientConnection(String username, String password, String id)
throws Exception
{
- //add the connection in the list of connections
- return _jmsProvider.getClientConnection(username, password, id);
+ return getConnectionBuilder().setClientId(id).setUsername(username).setPassword(password).build();
}
/**
@@ -912,4 +900,36 @@ public class QpidBrokerTestCase extends QpidTestCase
}
}
+ private static class ConectionBuilderHandler implements InvocationHandler
+ {
+ private final ConnectionBuilder _connectionBuilder;
+ private final List<Connection> _connections;
+
+ public ConectionBuilderHandler(final ConnectionBuilder connectionBuilder,
+ final List<Connection> connections)
+ {
+ _connectionBuilder = connectionBuilder;
+ _connections = connections;
+ }
+
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
+ {
+ if (method.getName().equals("build"))
+ {
+ Connection connection = _connectionBuilder.build();
+ _connections.add(connection);
+ return connection;
+ }
+ else if (method.getName().equals("buildConnectionFactory"))
+ {
+ return _connectionBuilder.buildConnectionFactory();
+ }
+ else
+ {
+ method.invoke(_connectionBuilder, args);
+ return proxy;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
index 0c56f01..2962c19 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xConnectionBuilder.java
@@ -20,8 +20,8 @@
package org.apache.qpid.test.utils;
+import java.util.Hashtable;
import java.util.Map;
-import java.util.Properties;
import java.util.TreeMap;
import javax.jms.Connection;
@@ -31,11 +31,6 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLSyntaxException;
-
public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
{
private String _clientId = "clientid";
@@ -46,9 +41,9 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
private boolean _enableFailover;
private final Map<String, Object> _options = new TreeMap<>();
private int _reconnectAttempts = 20;
- private String _host;
- private int _port;
- private int _sslPort;
+ private String _host = "localhost";
+ private int _port = Integer.getInteger("test.port");
+ private int _sslPort = Integer.getInteger("test.port.ssl");
@Override
public ConnectionBuilder setHost(final String host)
@@ -132,74 +127,117 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
{
if (syncPublish)
{
- _options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all");
+ _options.put("sync_publish", "all");
}
else
{
- _options.remove(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ _options.remove("sync_publish");
}
return this;
}
@Override
- public Connection build() throws JMSException, NamingException, URLSyntaxException
+ public ConnectionBuilder setOptions(final Map<String, String> options)
+ {
+ _options.putAll(options);
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setPopulateJMSXUserID(final boolean populateJMSXUserID)
+ {
+ _options.put("populateJMSXUserID", String.valueOf(populateJMSXUserID));
+ return this;
+ }
+
+ @Override
+ public Connection build() throws JMSException, NamingException
{
return buildConnectionFactory().createConnection(_username, _password);
}
@Override
- public ConnectionFactory buildConnectionFactory() throws NamingException, URLSyntaxException
+ public ConnectionFactory buildConnectionFactory() throws NamingException
{
- Properties contextProperties = new Properties();
- contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- contextProperties.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL));
- InitialContext initialContext = null;
- ConnectionFactory connectionFactory;
- try
+ StringBuilder cUrlBuilder = new StringBuilder("amqp://");
+ if (_username != null)
{
- initialContext = new InitialContext(contextProperties);
- String jndiName = "default";
- if (_enableFailover)
- {
- jndiName = "failover";
- }
+ cUrlBuilder.append(_username);
+ }
- if (_enableTls)
- {
- jndiName += ".ssl";
- }
- connectionFactory = (ConnectionFactory) initialContext.lookup(jndiName);
+ if (_username != null || _password != null)
+ {
+ cUrlBuilder.append(":");
}
- finally
+ if (_password != null)
{
- if (initialContext != null)
- {
- initialContext.close();
- }
+ cUrlBuilder.append(_password);
}
- AMQConnectionURL curl =
- new AMQConnectionURL(((AMQConnectionFactory) connectionFactory).getConnectionURLString());
+
+ if (_username != null || _password != null)
+ {
+ cUrlBuilder.append("@");
+ }
+
+ if (_clientId != null)
+ {
+ cUrlBuilder.append(_clientId);
+ }
+
+ cUrlBuilder.append("/");
if (_virtualHost != null)
{
- curl.setVirtualHost("/" + _virtualHost);
+ cUrlBuilder.append(_virtualHost);
}
- for (Map.Entry<String, Object> entry: _options.entrySet())
+ cUrlBuilder.append("?brokerlist='tcp://").append(_host).append(":");
+ if (_enableTls)
{
- curl.setOption(entry.getKey(), String.valueOf(entry.getValue()));
+ cUrlBuilder.append(_sslPort).append("?ssl='true'");
+ }
+ else
+ {
+ cUrlBuilder.append(_port);
}
if (_enableFailover)
{
- curl.setFailoverOption("cyclecount", String.valueOf(_reconnectAttempts));
+ cUrlBuilder.append(";tcp://").append(_host).append(":");
+ if (_enableTls)
+ {
+ cUrlBuilder.append(System.getProperty("test.port.alt.ssl")).append("?ssl='true'");
+ }
+ else
+ {
+ cUrlBuilder.append(System.getProperty("test.port.alt"));
+ }
+ cUrlBuilder.append("'")
+ .append("&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='")
+ .append(_reconnectAttempts)
+ .append("''");
+ }
+ else
+ {
+ cUrlBuilder.append("'");
}
- curl.setClientName(_clientId);
+ for (Map.Entry<String, Object> entry : _options.entrySet())
+ {
+ cUrlBuilder.append("&").append(entry.getKey()).append("='").append(entry.getValue()).append("'");
+ }
- curl = new AMQConnectionURL(curl.toString());
- connectionFactory = new AMQConnectionFactory(curl);
- return connectionFactory;
+ final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
+ initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
+ "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ final String factoryName = "connectionFactory";
+ initialContextEnvironment.put("connectionfactory." + factoryName, cUrlBuilder.toString());
+ return (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
+ }
+
+ String getBrokerDetails()
+ {
+ return "tcp://" + _host + ":" + _port;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
index 452940d..f582321 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClient0xProvider.java
@@ -20,9 +20,11 @@
package org.apache.qpid.test.utils;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
+import java.security.AccessControlException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
@@ -36,29 +38,9 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.jms.ConnectionURL;
public class QpidJmsClient0xProvider implements JmsProvider
{
- private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- static
- {
- String initialContext = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
-
- if (initialContext == null || initialContext.length() == 0)
- {
- System.setProperty(Context.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT);
- }
- }
-
- private final Hashtable<Object, Object> _initialContextEnvironment = new Hashtable<>();
private final AmqpManagementFacade _managementFacade;
public QpidJmsClient0xProvider(AmqpManagementFacade managementFacade)
@@ -69,14 +51,8 @@ public class QpidJmsClient0xProvider implements JmsProvider
@Override
public ConnectionFactory getConnectionFactory() throws NamingException
{
- if (Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL))
- {
- return getConnectionFactory("default.ssl");
- }
- else
- {
- return getConnectionFactory("default");
- }
+ return getConnectionBuilder().setTls(Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL))
+ .buildConnectionFactory();
}
@Override
@@ -85,142 +61,54 @@ public class QpidJmsClient0xProvider implements JmsProvider
throw new UnsupportedOperationException();
}
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName)
- throws NamingException
- {
- return getConnectionFactory(factoryName, "test", "clientid");
- }
-
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId)
- throws NamingException
- {
- return getConnectionFactory(factoryName, vhost, clientId, Collections.<String, String>emptyMap());
- }
-
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName,
- String vhost,
- String clientId,
- Map<String, String> options)
- throws NamingException
- {
-
- return (ConnectionFactory) new InitialContext(_initialContextEnvironment).lookup(factoryName);
- }
-
- @Override
- public Connection getConnection() throws JMSException, NamingException
+ private Connection getConnection() throws JMSException, NamingException
{
return getConnection(QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
- @Override
- public Connection getConnection(String username, String password) throws JMSException, NamingException
- {
- Connection con = getConnectionFactory().createConnection(username, password);
- return con;
- }
-
- @Override
- public Connection getClientConnection(String username, String password, String id)
- throws Exception
- {
- Connection con = ((AMQConnectionFactory) getConnectionFactory()).createConnection(username,
- password,
- id);
- return con;
- }
-
- @Override
- public Connection getConnectionWithPrefetch(int prefetch) throws Exception
+ private Connection getConnection(String username, String password) throws JMSException, NamingException
{
- return getConnectionWithOptions(Collections.singletonMap("maxprefetch", String.valueOf(prefetch)));
- }
-
- @Override
- public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
- {
- return getConnectionWithOptions("test", options);
- }
-
- @Override
- public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
- {
- ConnectionURL curl =
- new AMQConnectionURL(((AMQConnectionFactory) getConnectionFactory()).getConnectionURLString());
- for (Map.Entry<String, String> entry : options.entrySet())
- {
- curl.setOption(entry.getKey(), entry.getValue());
- }
-
- curl = new AMQConnectionURL(curl.toString());
- curl.setUsername(QpidBrokerTestCase.GUEST_USERNAME);
- curl.setPassword(QpidBrokerTestCase.GUEST_PASSWORD);
- curl.setVirtualHost(vhost);
- Connection connection = new AMQConnectionFactory(curl).createConnection(curl.getUsername(), curl.getPassword());
-
- return connection;
- }
-
- @Override
- public Connection getConnectionForVHost(String vhost)
- throws Exception
- {
- return getConnectionForVHost(vhost, QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
- }
- @Override
- public Connection getConnectionForVHost(String vhost, String username, String password)
- throws Exception
- {
- ConnectionURL curl =
- new AMQConnectionURL(((AMQConnectionFactory) getConnectionFactory()).getConnectionURLString());
- curl.setVirtualHost("/" + vhost);
- curl = new AMQConnectionURL(curl.toString());
-
- curl.setUsername(username);
- curl.setPassword(password);
- Connection connection =
- new AMQConnectionFactory(curl).createConnection(curl.getUsername(), curl.getPassword());
-
- return connection;
+ return getConnectionBuilder().setUsername(username).setPassword(password).build();
}
@Override
public Connection getConnection(String urlString) throws Exception
{
- ConnectionURL url = new AMQConnectionURL(urlString);
- Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
- return connection;
- }
-
- @Override
- public Connection getConnectionWithSyncPublishing() throws Exception
- {
- Map<String, String> options = new HashMap<>();
- options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all");
- return getConnectionWithOptions(options);
+ final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
+ initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ final String factoryName = "connectionFactory";
+ initialContextEnvironment.put("connectionfactory." + factoryName, urlString);
+ ConnectionFactory connectionFactory =
+ (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
+ return connectionFactory.createConnection();
}
@Override
- public Queue getTestQueue(final String testQueueName)
+ public Queue getTestQueue(final String testQueueName) throws NamingException
{
- return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, testQueueName);
+ return createReflectively("org.apache.qpid.client.AMQQueue", "amq.direct", testQueueName);
}
@Override
public Queue getQueueFromName(Session session, String name) throws JMSException
{
- return new AMQQueue("", name);
+ return createReflectively("org.apache.qpid.client.AMQQueue", "", name);
}
@Override
public Queue createTestQueue(Session session, String queueName) throws JMSException
{
- Queue amqQueue = getTestQueue(queueName);
+ Queue amqQueue = null;
+ try
+ {
+ amqQueue = getTestQueue(queueName);
+ }
+ catch (NamingException e)
+ {
+ throw new RuntimeException(e);
+ }
session.createConsumer(amqQueue).close();
return amqQueue;
}
@@ -228,7 +116,7 @@ public class QpidJmsClient0xProvider implements JmsProvider
@Override
public Topic getTestTopic(final String testQueueName)
{
- return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, testQueueName);
+ return createReflectively("org.apache.qpid.client.AMQTopic", "amq.topic", testQueueName);
}
@Override
@@ -240,84 +128,129 @@ public class QpidJmsClient0xProvider implements JmsProvider
@Override
public Topic createTopicOnDirect(final Connection con, String topicName) throws JMSException, URISyntaxException
{
- return new AMQTopic(
- "direct://amq.direct/"
- + topicName
- + "/"
- + topicName
- + "?routingkey='"
- + topicName
- + "',exclusive='true',autodelete='true'");
+ return createReflectively("org.apache.qpid.client.AMQTopic",
+ "direct://amq.direct/"
+ + topicName
+ + "/"
+ + topicName
+ + "?routingkey='"
+ + topicName
+ + "',exclusive='true',autodelete='true'");
}
- @Override
- public Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
- {
- return new AMQTopic(
- "fanout://amq.fanout/"
- + topicName
- + "/"
- + topicName
- + "?routingkey='"
- + topicName
- + "',exclusive='true',autodelete='true'");
- }
-
- @Override
- public long getQueueDepth(final Connection con, final Queue destination) throws Exception
+ private <T> T createReflectively(String className, Object ...args)
{
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- return ((AMQSession<?, ?>) session).getQueueDepth((AMQDestination) destination);
+ Class<?> topicClass = Class.forName(className);
+ Class[] classes = new Class[args.length];
+ for (int i = 0; i < args.length; ++i)
+ {
+ classes[i] = args[i].getClass();
+ }
+ Constructor<?> constructor = topicClass.getConstructor(classes);
+ return (T) constructor.newInstance(args);
}
- finally
+ catch (IllegalAccessException | AccessControlException | InvocationTargetException | InstantiationException | NoSuchMethodException | ClassNotFoundException e)
{
- session.close();
+ throw new RuntimeException(e);
}
+
+ }
+
+ @Override
+ public Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
+ {
+ return createReflectively("org.apache.qpid.client.AMQTopic", "fanout://amq.fanout/"
+ + topicName
+ + "/"
+ + topicName
+ + "?routingkey='"
+ + topicName
+ + "',exclusive='true',autodelete='true'");
}
@Override
- public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+ public long getQueueDepth(final Queue destination) throws Exception
{
- Queue queue = new AMQQueue("", destination.getQueueName());
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ Connection connection = getConnection();
try
{
- return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination) queue);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ Map<String, Object> arguments = Collections.singletonMap("statistics",
+ Collections.singletonList("queueDepthMessages"));
+ Object statistics = _managementFacade.performOperationUsingAmqpManagement(escapedName,
+ "getStatistics",
+ session,
+ "org.apache.qpid.Queue",
+ arguments);
+
+ Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+ return ((Number) statisticsMap.get("queueDepthMessages")).intValue();
+ }
+ finally
+ {
+ session.close();
+ }
}
finally
{
- session.close();
+ connection.close();
}
}
@Override
- public String getBrokerDetailsFromDefaultConnectionUrl()
+ public boolean isQueueExist(final Queue destination) throws Exception
{
+ final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ Connection connection = getConnection();
try
{
- AMQConnectionFactory factory = (AMQConnectionFactory) getConnectionFactory();
- ConnectionURL connectionURL = factory.getConnectionURL();
- if (connectionURL.getBrokerCount() > 0)
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ _managementFacade.performOperationUsingAmqpManagement(escapedName,
+ "READ",
+ session,
+ "org.apache.qpid.Queue",
+ Collections.emptyMap());
+ return true;
+ }
+ catch (AmqpManagementFacade.OperationUnsuccessfulException e)
{
- return connectionURL
- .getBrokerDetails(0)
- .toString();
+ if (e.getStatusCode() == 404)
+ {
+ return false;
+ }
+ else
+ {
+ throw e;
+ }
}
- else
+ finally
{
- throw new RuntimeException("No broker details are available.");
+ session.close();
}
}
- catch (NamingException e)
+ finally
{
- throw new RuntimeException("No broker details are available.", e);
+ connection.close();
}
}
@Override
- public ConnectionBuilder getConnectionBuilder()
+ public String getBrokerDetailsFromDefaultConnectionUrl()
+ {
+ return getConnectionBuilder().getBrokerDetails();
+ }
+
+ @Override
+ public QpidJmsClient0xConnectionBuilder getConnectionBuilder()
{
return new QpidJmsClient0xConnectionBuilder();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
index 459c497..096b854 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientConnectionBuilder.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -161,6 +162,20 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
}
@Override
+ public ConnectionBuilder setOptions(final Map<String, String> options)
+ {
+ _options.putAll(options);
+ return this;
+ }
+
+ @Override
+ public ConnectionBuilder setPopulateJMSXUserID(final boolean populateJMSXUserID)
+ {
+ _options.put("jms.populateJMSXUserID", String.valueOf(populateJMSXUserID));
+ return this;
+ }
+
+ @Override
public Connection build() throws NamingException, JMSException
{
return buildConnectionFactory().createConnection();
@@ -170,7 +185,10 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
public ConnectionFactory buildConnectionFactory() throws NamingException
{
final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
- final String factoryName;
+ initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
+ "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+
+ final StringBuilder connectionUrlBuilder = new StringBuilder();
final Map<String, Object> options = new TreeMap<>();
options.putAll(_options);
@@ -180,36 +198,30 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
options.put("failover.maxReconnectAttempts", "2");
}
- final StringBuilder stem = new StringBuilder("failover:(amqp://")
+ connectionUrlBuilder.append("failover:(amqp://")
.append(_host)
.append(":")
.append(_port)
.append(",amqp://localhost:")
.append(System.getProperty("test.port.alt"))
.append(")");
- appendOptions(options, stem);
-
- initialContextEnvironment.put("property.connectionfactory.failover.remoteURI",
- stem.toString());
- factoryName = "failover";
+ appendOptions(options, connectionUrlBuilder);
}
else if (!_enableTls)
{
- final StringBuilder stem =
- new StringBuilder("amqp://").append(_host).append(":").append(_port);
+ connectionUrlBuilder.append("amqp://").append(_host).append(":").append(_port);
- appendOptions(options, stem);
-
- initialContextEnvironment.put("property.connectionfactory.default.remoteURI", stem.toString());
- factoryName = "default";
+ appendOptions(options, connectionUrlBuilder);
}
else
{
- final StringBuilder stem = new StringBuilder("amqps://").append(_host).append(":").append(_sslPort);
- appendOptions(options, stem);
- initialContextEnvironment.put("connectionfactory.default.ssl", stem.toString());
- factoryName = "default.ssl";
+ connectionUrlBuilder.append("amqps://").append(_host).append(":").append(_sslPort);
+ appendOptions(options, connectionUrlBuilder);
}
+
+ final String factoryName = "connection";
+ initialContextEnvironment.put("connectionfactory." + factoryName, connectionUrlBuilder.toString());
+
return (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
index 6d26b63..1e590be 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidJmsClientProvider.java
@@ -20,29 +20,18 @@
package org.apache.qpid.test.utils;
-import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
-import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.TemporaryQueue;
import javax.jms.Topic;
-import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QpidJmsClientProvider implements JmsProvider
@@ -58,54 +47,13 @@ public class QpidJmsClientProvider implements JmsProvider
@Override
public ConnectionFactory getConnectionFactory() throws NamingException
{
- return getConnectionFactory(Collections.<String, String>emptyMap());
- }
-
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName) throws NamingException
- {
- return getConnectionFactory(factoryName, Collections.<String, String>emptyMap());
+ return getConnectionFactory(Collections.emptyMap());
}
@Override
public ConnectionFactory getConnectionFactory(Map<String, String> options) throws NamingException
{
-
- if (Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL))
- {
- return getConnectionFactory("default.ssl", options);
- }
- else
- {
- return getConnectionFactory("default", options);
- }
- }
-
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId) throws NamingException
- {
- return getConnectionFactory(factoryName, vhost, clientId, Collections.<String, String>emptyMap());
- }
-
- @Override
- public ConnectionFactory getConnectionFactory(String factoryName,
- String vhost,
- String clientId,
- Map<String, String> options)
- throws NamingException
- {
-
- Map<String, String> actualOptions = new LinkedHashMap<>();
- actualOptions.put("amqp.vhost", vhost);
- actualOptions.put("jms.clientID", clientId);
- actualOptions.putAll(options);
- return getConnectionFactory(factoryName, actualOptions);
- }
-
- private ConnectionFactory getConnectionFactory(final String factoryName, Map<String, String> options)
- throws NamingException
- {
-
+ boolean useSsl = Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL);
if (!options.containsKey("amqp.vhost"))
{
options = new HashMap<>(options);
@@ -131,99 +79,18 @@ public class QpidJmsClientProvider implements JmsProvider
options.put("jms.populateJMSXUserID", "true");
}
- final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
- if ("failover".equals(factoryName))
- {
- if (!options.containsKey("failover.maxReconnectAttempts"))
- {
- options.put("failover.maxReconnectAttempts", "2");
- }
- final StringBuilder stem = new StringBuilder("failover:(amqp://localhost:")
- .append(System.getProperty("test.port"))
- .append(",amqp://localhost:")
- .append(System.getProperty("test.port.alt"))
- .append(")");
- appendOptions(options, stem);
-
- initialContextEnvironment.put("property.connectionfactory.failover.remoteURI",
- stem.toString());
- }
- else if ("default".equals(factoryName))
- {
- final StringBuilder stem =
- new StringBuilder("amqp://localhost:").append(System.getProperty("test.port"));
-
- appendOptions(options, stem);
-
- initialContextEnvironment.put("property.connectionfactory.default.remoteURI", stem.toString());
- }
- else if ("default.ssl".equals(factoryName))
- {
-
- final StringBuilder stem = new StringBuilder("amqps://localhost:").append(String.valueOf(System.getProperty("test.port.ssl")));
- appendOptions(options, stem);
- initialContextEnvironment.put("connectionfactory.default.ssl", stem.toString());
- }
- return (ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
+ return getConnectionBuilder().setTls(useSsl).setOptions(options).buildConnectionFactory();
}
- @Override
- public Connection getConnection() throws JMSException, NamingException
+ private Connection getConnection() throws JMSException, NamingException
{
return getConnection(QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
- @Override
- public Connection getConnection(String username, String password) throws JMSException, NamingException
- {
- Connection con = getConnectionFactory().createConnection(username, password);
- return con;
- }
- @Override
- public Connection getConnectionWithPrefetch(int prefetch) throws Exception
+ private Connection getConnection(String username, String password) throws JMSException, NamingException
{
- String factoryName = Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL) ? "default.ssl" : "default";
-
- final Map<String, String> options =
- Collections.singletonMap("jms.prefetchPolicy.all", String.valueOf(prefetch));
- final ConnectionFactory connectionFactory = getConnectionFactory(factoryName, "test", getNextClientId(), options);
- return connectionFactory.createConnection(QpidBrokerTestCase.GUEST_USERNAME,
- QpidBrokerTestCase.GUEST_PASSWORD);
- }
-
- @Override
- public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
- {
- return getConnectionWithOptions("test", options);
- }
-
- @Override
- public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
- {
- return getConnectionFactory(Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL)
- ? "default.ssl"
- : "default",
- vhost,
- getNextClientId(),
- options).createConnection(QpidBrokerTestCase.GUEST_USERNAME,
- QpidBrokerTestCase.GUEST_PASSWORD);
- }
-
- @Override
- public Connection getConnectionForVHost(String vhost)
- throws Exception
- {
- return getConnectionForVHost(vhost, QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
- }
-
- @Override
- public Connection getConnectionForVHost(String vhost, String username, String password)
- throws Exception
- {
- return getConnectionFactory(Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL)
- ? "default.ssl"
- : "default", vhost, getNextClientId()).createConnection(username, password);
+ return getConnectionFactory().createConnection(username, password);
}
@Override
@@ -328,122 +195,73 @@ public class QpidJmsClientProvider implements JmsProvider
}
@Override
- public long getQueueDepth(final Connection con, final Queue destination) throws Exception
+ public long getQueueDepth(final Queue destination) throws Exception
{
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ Connection connection = getConnection();
try
{
-
- MessageProducer producer = session.createProducer(session.createQueue("$management"));
- final TemporaryQueue responseQ = session.createTemporaryQueue();
- MessageConsumer consumer = session.createConsumer(responseQ);
- MapMessage message = session.createMapMessage();
- message.setStringProperty("index", "object-path");
- final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
- message.setStringProperty("key", escapedName);
- message.setStringProperty("type", "org.apache.qpid.Queue");
- message.setStringProperty("operation", "getStatistics");
- message.setStringProperty("statistics", "[\"queueDepthMessages\"]");
-
- message.setJMSReplyTo(responseQ);
-
- producer.send(message);
-
- Message response = consumer.receive();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- if (response instanceof MapMessage)
- {
- return ((MapMessage) response).getLong("queueDepthMessages");
- }
- else if (response instanceof ObjectMessage)
- {
- Object body = ((ObjectMessage) response).getObject();
- if (body instanceof Map)
- {
- return Long.valueOf(((Map) body).get("queueDepthMessages").toString());
- }
- else
- {
- throw new IllegalArgumentException("Cannot parse the results from a management operation."
- + " Unexpected message object type : " + body);
- }
- }
- else
- {
- throw new IllegalArgumentException("Cannot parse the results from a management operation."
- + " Unexpected response message type : " + response.getClass());
- }
+ Map<String, Object> arguments = Collections.singletonMap("statistics",
+ Collections.singletonList("queueDepthMessages"));
+ Object statistics = _managementFacade.performOperationUsingAmqpManagement(escapedName,
+ "getStatistics",
+ session,
+ "org.apache.qpid.Queue",
+ arguments);
+
+ Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+ return ((Number) statisticsMap.get("queueDepthMessages")).intValue();
}
finally
{
- consumer.close();
- responseQ.delete();
+ session.close();
}
}
finally
{
- session.close();
+ connection.close();
}
}
@Override
- public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
+ public boolean isQueueExist(final Queue destination) throws Exception
{
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+ Connection connection = getConnection();
try
{
- MessageProducer producer = session.createProducer(session.createQueue("$management"));
- final TemporaryQueue responseQ = session.createTemporaryQueue();
- MessageConsumer consumer = session.createConsumer(responseQ);
- MapMessage message = session.createMapMessage();
- message.setStringProperty("index", "object-path");
- final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
- message.setStringProperty("key", escapedName);
- message.setStringProperty("type", "org.apache.qpid.Queue");
- message.setStringProperty("operation", "READ");
-
- message.setJMSReplyTo(responseQ);
-
- producer.send(message);
-
- Message response = consumer.receive();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- int statusCode = response.getIntProperty("statusCode");
- switch(statusCode)
- {
- case 200:
- return true;
- case 404:
- return false;
- default:
- throw new RuntimeException(String.format("Unexpected response for queue query '%s' : %d", destination.getQueueName(), statusCode));
- }
+ _managementFacade.performOperationUsingAmqpManagement(escapedName,
+ "READ",
+ session,
+ "org.apache.qpid.Queue",
+ Collections.emptyMap());
+ return true;
}
- finally
+ catch (AmqpManagementFacade.OperationUnsuccessfulException e)
{
- consumer.close();
- responseQ.delete();
+ if (e.getStatusCode() == 404)
+ {
+ return false;
+ }
+ else
+ {
+ throw e;
+ }
}
}
finally
{
- session.close();
+ connection.close();
}
- }
- @Override
- public Connection getConnectionWithSyncPublishing() throws Exception
- {
- return getConnection();
- }
-
- @Override
- public Connection getClientConnection(String username, String password, String id)
- throws Exception
- {
- return getConnectionFactory("default", "test", id).createConnection(username, password);
}
@Override
@@ -458,31 +276,6 @@ public class QpidJmsClientProvider implements JmsProvider
return new QpidJmsClientConnectionBuilder();
}
- private void appendOptions(final Map<String, String> actualOptions, final StringBuilder stem)
- {
- boolean first = true;
- for(Map.Entry<String, String> option : actualOptions.entrySet())
- {
- if(first)
- {
- stem.append('?');
- first = false;
- }
- else
- {
- stem.append('&');
- }
- try
- {
- stem.append(option.getKey()).append('=').append(URLEncoder.encode(option.getValue(), "UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
private String getNextClientId()
{
return "clientid-" + CLIENTID_COUNTER.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index e46db0e..dd04d6d 100644
--- a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -1162,8 +1162,6 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Exception
private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
{
- BrokerDetails origBrokerDetails = ((AMQConnectionFactory) getConnectionFactory("default")).getConnectionURL().getBrokerDetails(0);
-
String retries = "200";
String connectdelay = "1000";
String cycleCount = "2";
@@ -1171,7 +1169,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Exception
String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
"'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
- String newUrl = String.format(newUrlFormat, origBrokerDetails.getHost(), origBrokerDetails.getPort(),
+ String newUrl = String.format(newUrlFormat, "localhost", getDefaultAmqpPort(),
retries, connectdelay, cycleCount);
if (connectionOptions != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/client/message/ObjectMessageClassWhitelistingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/message/ObjectMessageClassWhitelistingTest.java b/systests/src/test/java/org/apache/qpid/client/message/ObjectMessageClassWhitelistingTest.java
index d1ebb2e..391f165 100644
--- a/systests/src/test/java/org/apache/qpid/client/message/ObjectMessageClassWhitelistingTest.java
+++ b/systests/src/test/java/org/apache/qpid/client/message/ObjectMessageClassWhitelistingTest.java
@@ -205,7 +205,7 @@ public class ObjectMessageClassWhitelistingTest extends QpidBrokerTestCase
doTestBlackListedEnclosedClassTest(c, new NestedClass(TEST_VALUE));
}
- private void doTestWhiteListedEnclosedClassTest(Connection c, Serializable content) throws JMSException
+ private void doTestWhiteListedEnclosedClassTest(Connection c, Serializable content) throws Exception
{
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -225,7 +225,7 @@ public class ObjectMessageClassWhitelistingTest extends QpidBrokerTestCase
assertEquals("Received object has unexpected content", content, receivedObject);
}
- private void doTestBlackListedEnclosedClassTest(final Connection c, final Serializable content) throws JMSException
+ private void doTestBlackListedEnclosedClassTest(final Connection c, final Serializable content) throws Exception
{
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
index 5703473..edd96b9 100644
--- a/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -107,7 +107,7 @@ public class SSLTest extends QpidBrokerTestCase
options.put("transport.trustStoreLocation", TRUSTSTORE);
options.put("transport.trustStorePassword", TRUSTSTORE_PASSWORD);
- con = getConnectionWithOptions(options);
+ con = getConnectionBuilder().setTls(true).setOptions(options).build();
}
else
{
@@ -432,7 +432,7 @@ public class SSLTest extends QpidBrokerTestCase
final Map<String, String> options = new HashMap<>();
options.put("transport.trustStoreLocation", TRUSTSTORE);
options.put("transport.trustStorePassword", TRUSTSTORE_PASSWORD);
- con = getConnectionWithOptions(options);
+ con = getConnectionBuilder().setTls(true).setOptions(options).build();
}
else
{
@@ -499,7 +499,7 @@ public class SSLTest extends QpidBrokerTestCase
options.put("transport.trustStoreLocation", TRUSTSTORE);
options.put("transport.trustStorePassword", TRUSTSTORE_PASSWORD);
- con = getConnectionWithOptions(options);
+ con = getConnectionBuilder().setTls(true).setOptions(options).build();
}
@@ -562,7 +562,7 @@ public class SSLTest extends QpidBrokerTestCase
options.put("transport.trustStoreLocation", TRUSTSTORE);
options.put("transport.trustStorePassword", TRUSTSTORE_PASSWORD);
- con = getConnectionWithOptions(options);
+ con = getConnectionBuilder().setTls(true).setOptions(options).build();
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java b/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
index b3a3f9f..0cf5492 100644
--- a/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
@@ -223,7 +223,7 @@ public class AbruptClientDisconnectTest extends QpidBrokerTestCase
setSystemProperty("test.port", String.valueOf(localPort));
setSystemProperty("test.port.alt", String.valueOf(localPort));
- Connection tunneledConnection = getConnectionFactory("default").createConnection(GUEST_USERNAME, GUEST_PASSWORD);
+ Connection tunneledConnection = getConnection();
_tcpTunneler.addClientListener(clientMonitor);
final AtomicReference<JMSException> _exception = new AtomicReference<>();
tunneledConnection.setExceptionListener(new ExceptionListener()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java b/systests/src/test/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
index 3e2144d..a796852 100644
--- a/systests/src/test/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
@@ -25,6 +25,8 @@ import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.*;
+import javax.naming.NamingException;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -58,18 +60,19 @@ public class MessageProtocolConversionTest extends QpidBrokerTestCase
_connection_0_9_1 = getConnection();
}
- public void test0_9_1_to_0_10_conversion() throws JMSException, QpidException
+ public void test0_9_1_to_0_10_conversion() throws JMSException, QpidException, NamingException
{
doConversionTests(_connection_0_9_1, _connection_0_10);
}
- public void test_0_10_to_0_9_1_conversion() throws JMSException, QpidException
+ public void test_0_10_to_0_9_1_conversion() throws JMSException, QpidException, NamingException
{
doConversionTests(_connection_0_10, _connection_0_9_1);
}
- private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, QpidException
+ private void doConversionTests(Connection producerConn, Connection consumerConn)
+ throws JMSException, QpidException, NamingException
{
Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java b/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
index 7a6a336..53c7a9a 100644
--- a/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
@@ -373,7 +373,7 @@ public class ExternalAuthenticationTest extends QpidBrokerTestCase
{
if(isBroker10())
{
- System.setProperty("test.port.ssl", ""+getDefaultBroker().getAmqpTlsPort());
+ System.setProperty("test.port.ssl", "" + getDefaultBroker().getAmqpTlsPort());
Map<String, String> options = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java b/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
index 1d048ae..f128aad 100644
--- a/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
@@ -53,6 +53,7 @@ import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class MessageCompressionTest extends QpidBrokerTestCase
{
+ private static String VIRTUAL_HOST = "test";
private RestTestHelper _restTestHelper;
@Override
@@ -189,13 +190,12 @@ public class MessageCompressionTest extends QpidBrokerTestCase
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
- String virtualPath = ((AMQConnectionFactory) getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
- String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
+ String queueRelativePath = "queue/" + VIRTUAL_HOST + "/" + VIRTUAL_HOST + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
@@ -218,13 +218,12 @@ public class MessageCompressionTest extends QpidBrokerTestCase
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
- String virtualPath = ((AMQConnectionFactory) getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
- String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
+ String queueRelativePath = "queue/" + VIRTUAL_HOST + "/" + VIRTUAL_HOST + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
@@ -246,13 +245,12 @@ public class MessageCompressionTest extends QpidBrokerTestCase
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
- String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
- String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
+ String queueRelativePath = "queue/" + VIRTUAL_HOST + "/" + VIRTUAL_HOST + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
@@ -284,14 +282,13 @@ public class MessageCompressionTest extends QpidBrokerTestCase
doActualSetUp();
Connection senderConnection = getConnection(true);
- String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
Map<String, Object> mapToSend = createMapToSend();
publishMapMessage(senderConnection, mapToSend, testQueue);
- String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
+ String queueRelativePath = "queue/" + VIRTUAL_HOST + "/" + VIRTUAL_HOST + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
@@ -311,7 +308,6 @@ public class MessageCompressionTest extends QpidBrokerTestCase
doActualSetUp();
Connection senderConnection = getConnection(true);
- String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
@@ -319,7 +315,7 @@ public class MessageCompressionTest extends QpidBrokerTestCase
publishMapMessage(senderConnection, mapToSend, testQueue);
- String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
+ String queueRelativePath = "queue/" + VIRTUAL_HOST + "/" + VIRTUAL_HOST + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java b/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
index 50bc3c1..67f245c 100644
--- a/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@@ -139,11 +138,10 @@ public class AmqpManagementTest extends QpidBrokerTestCase
private void setupBrokerManagementConnection() throws Exception
{
- ConnectionFactory management =
- isBroker10() ? getConnectionFactory("default", "$management", UUID.randomUUID().toString())
- : getConnectionFactory("management");
-
- _connection = management.createConnection(GUEST_USERNAME, GUEST_PASSWORD);
+ _connection = getConnectionBuilder().setVirtualHost("$management")
+ .setTls(true)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
setupSession();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index f6a85fb..8d8e345 100644
--- a/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -339,7 +339,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
Connection connection = null;
try
{
- connection = getConnectionFactory("failover").createConnection(GUEST_USERNAME, GUEST_PASSWORD);
+ connection = getConnectionBuilder().setFailover(true).build();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/test/unit/message/UTF8Test.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/message/UTF8Test.java b/systests/src/test/java/org/apache/qpid/test/unit/message/UTF8Test.java
index 48d03ac..af460b3 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/message/UTF8Test.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/message/UTF8Test.java
@@ -25,15 +25,14 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.naming.InitialContext;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -91,43 +90,42 @@ public class UTF8Test extends QpidBrokerTestCase
private Destination getReceivingDestination(String exch, String routkey, String qname, final Session session) throws Exception
{
- Properties props = new Properties();
+ createEntitiesUsingAmqpManagement(exch, routkey, qname, session);
if(isBroker10())
{
- props.setProperty("queue.recvDest", qname);
- createTestQueue(session, qname);
+ return session.createQueue(qname);
}
else
{
- props.setProperty("destination.recvDest",
- "direct://" + exch + "//" + qname + "?autodelete='false'&durable='false'"
- + "&routingkey='" + routkey + "'");
+ return session.createQueue("BURL:direct://" + exch + "//" + qname + "?autodelete='false'&durable='false'"
+ + "&routingkey='" + routkey + "'");
}
- // Get our connection context
- InitialContext ctx = new InitialContext(props);
- return (Destination) ctx.lookup("recvDest");
}
private Destination getSendingDestination(String exch, String routkey, String qname, final Session session) throws Exception
{
- Properties props = new Properties();
+ createEntitiesUsingAmqpManagement(exch, routkey, qname, session);
if(isBroker10())
{
- props.setProperty("topic.sendDest", exch +"/" + routkey);
- createEntityUsingAmqpManagement(exch, session, "org.apache.qpid.DirectExchange");
- final Map<String, Object> arguments = new HashMap<>();
- arguments.put("destination",qname);
- arguments.put("bindingKey", routkey);
- performOperationUsingAmqpManagement(exch, "bind", session, "org.apache.qpid.DirectExchange", arguments);
+ return session.createQueue(exch +"/" + routkey);
}
else
{
- props.setProperty("destination.sendDest",
- "direct://" + exch + "//" + qname + "?autodelete='false'&durable='false'"
- + "&routingkey='" + routkey + "'");
+ return session.createQueue("BURL:direct://" + exch + "//" + qname + "?autodelete='false'&durable='false'"
+ + "&routingkey='" + routkey + "'");
}
- // Get our connection context
- InitialContext ctx = new InitialContext(props);
- return (Destination) ctx.lookup("sendDest");
+ }
+
+ private void createEntitiesUsingAmqpManagement(final String exch,
+ final String routkey,
+ final String qname,
+ final Session session) throws JMSException
+ {
+ createEntityUsingAmqpManagement(exch, session, "org.apache.qpid.DirectExchange");
+ createEntityUsingAmqpManagement(qname, session, "org.apache.qpid.Queue");
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination",qname);
+ arguments.put("bindingKey", routkey);
+ performOperationUsingAmqpManagement(exch, "bind", session, "org.apache.qpid.DirectExchange", arguments);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 3d93892..b781058 100644
--- a/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -46,7 +46,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase implements ConnectionLi
protected BrokerHolder _alternativeBroker;
protected int _port;
protected int _alternativePort;
- private ConnectionFactory _connectionFactory;
private final List<Connection> _connections = new ArrayList<>();
@Override
@@ -97,18 +96,9 @@ public class FailoverBaseCase extends QpidBrokerTestCase implements ConnectionLi
public ConnectionFactory getConnectionFactory() throws NamingException
{
LOGGER.info("get ConnectionFactory");
- if (_connectionFactory == null)
- {
- if (Boolean.getBoolean("profile.use_ssl"))
- {
- _connectionFactory = getConnectionFactory("failover.ssl");
- }
- else
- {
- _connectionFactory = getConnectionFactory("failover");
- }
- }
- return _connectionFactory;
+ return getConnectionBuilder().setFailover(true)
+ .setTls(Boolean.getBoolean("profile.use_ssl"))
+ .buildConnectionFactory();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/9089861a/test-profiles/Java10UninvestigatedTestsExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10UninvestigatedTestsExcludes b/test-profiles/Java10UninvestigatedTestsExcludes
index 82e8a89..349fb71 100644
--- a/test-profiles/Java10UninvestigatedTestsExcludes
+++ b/test-profiles/Java10UninvestigatedTestsExcludes
@@ -22,6 +22,9 @@
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#*
+QPID-XXXX: It could be a broker bug. The issue requires further inevestigation
+org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue
+org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicyMessageDepth
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/7] qpid-broker-j git commit: QPID-6933: [System Tests] Change amqp
protocol version defined in maven profiles to be spec complient
Posted by or...@apache.org.
QPID-6933: [System Tests] Change amqp protocol version defined in maven profiles to be spec complient
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c47109b4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c47109b4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c47109b4
Branch: refs/heads/master
Commit: c47109b41dbeda51159ae5ce22666e5cee8eedab
Parents: 0b7666e
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 25 20:17:38 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
pom.xml | 40 ++++++++++----------
.../qpid/test/utils/QpidBrokerTestCase.java | 2 +-
2 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c47109b4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0c0c445..d2e3087 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@
<profile.broker.command.windows>"${qpid.home}${file.separator}bin${file.separator}qpid-server.bat" -sp "${at.sign}STORE_PATH" -st ${at.sign}STORE_TYPE -prop test.port=${at.sign}PORT -prop "qpid.work_dir=${at.sign}QPID_WORK"</profile.broker.command.windows>
<profile.test.excludes>Excludes JavaExcludes ${profile}.excludes ${profile.specific.excludes}</profile.test.excludes>
<profile.specific.excludes>JavaTransientExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>1.0</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
@@ -1007,7 +1007,7 @@
<properties>
<profile>java-mms.0-10</profile>
<profile.specific.excludes>JavaTransientExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
@@ -1027,7 +1027,7 @@
<properties>
<profile>java-mms.0-9-1</profile>
<profile.specific.excludes>JavaTransientExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9_1</profile.broker.version>
+ <profile.broker.version>0-9-1</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
@@ -1047,7 +1047,7 @@
<properties>
<profile>java-mms.0-9</profile>
<profile.specific.excludes>JavaTransientExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9</profile.broker.version>
+ <profile.broker.version>0-9</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
@@ -1067,7 +1067,7 @@
<properties>
<profile>java-bdb.0-10</profile>
<profile.specific.excludes>JavaPersistentExcludes Java010Excludes JavaBDBExcludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
@@ -1086,7 +1086,7 @@
<properties>
<profile>java-bdb.0-9-1</profile>
<profile.specific.excludes>JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes</profile.specific.excludes>
- <profile.broker.version>v0_9_1</profile.broker.version>
+ <profile.broker.version>0-9-1</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
@@ -1105,7 +1105,7 @@
<properties>
<profile>java-bdb.0-9</profile>
<profile.specific.excludes>JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes</profile.specific.excludes>
- <profile.broker.version>v0_9</profile.broker.version>
+ <profile.broker.version>0-9</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
@@ -1124,7 +1124,7 @@
<properties>
<profile>java-dby-mem.0-10</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1143,7 +1143,7 @@
<properties>
<profile>java-dby-mem.0-9-1</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9_1</profile.broker.version>
+ <profile.broker.version>0-9-1</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1162,7 +1162,7 @@
<properties>
<profile>java-dby-mem.0-9</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9</profile.broker.version>
+ <profile.broker.version>0-9</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1181,7 +1181,7 @@
<properties>
<profile>java-dby.0-10</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1201,7 +1201,7 @@
<properties>
<profile>java-dby.0-9-1</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9_1</profile.broker.version>
+ <profile.broker.version>0-9-1</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1221,7 +1221,7 @@
<properties>
<profile>java-dby.0-9</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9</profile.broker.version>
+ <profile.broker.version>0-9</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1242,7 +1242,7 @@
<properties>
<profile>java-mms.1-0</profile>
<profile.specific.excludes>JavaTransientExcludes Java10Excludes Java10BrokenTestsExcludes Java10UninvestigatedTestsExcludes</profile.specific.excludes>
- <profile.broker.version>v1_0</profile.broker.version>
+ <profile.broker.version>1.0</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10","AMQP_1_0"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
@@ -1264,7 +1264,7 @@
<properties>
<profile>java-bdb.1-0</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaBDBExcludes Java10Excludes Java10BrokenTestsExcludes Java10UninvestigatedTestsExcludes</profile.specific.excludes>
- <profile.broker.version>v1_0</profile.broker.version>
+ <profile.broker.version>1.0</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_1_0"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>BDB</profile.virtualhostnode.type>
@@ -1285,7 +1285,7 @@
<properties>
<profile>java-dby.1-0</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaDerbyExcludes Java10Excludes Java10BrokenTestsExcludes Java10UninvestigatedTestsExcludes</profile.specific.excludes>
- <profile.broker.version>v1_0</profile.broker.version>
+ <profile.broker.version>1.0</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_1_0"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>DERBY</profile.virtualhostnode.type>
@@ -1311,7 +1311,7 @@
<properties>
<profile>java-json.1-0</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes Java10Excludes Java10BrokenTestsExcludes Java10UninvestigatedTestsExcludes</profile.specific.excludes>
- <profile.broker.version>v1_0</profile.broker.version>
+ <profile.broker.version>1.0</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_1_0"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
@@ -1331,7 +1331,7 @@
<properties>
<profile>java-json.0-9-1</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes JavaPre010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_9_1</profile.broker.version>
+ <profile.broker.version>0-9-1</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
@@ -1350,7 +1350,7 @@
<properties>
<profile>java-json.0-10</profile>
<profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
<profile.broker.persistent>true</profile.broker.persistent>
<profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
@@ -1380,7 +1380,7 @@
<profile.broker.command.windows />
<profile.test.excludes>Excludes CPPExcludes ${profile}.excludes ${profile.specific.excludes} cpp.excludes</profile.test.excludes>
<profile.specific.excludes>CPPPrefetchExcludes CPPTransientExcludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
+ <profile.broker.version>0-10</profile.broker.version>
<profile.broker.persistent>false</profile.broker.persistent>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c47109b4/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 6a9b6a0..4aec884 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -73,7 +73,7 @@ public class QpidBrokerTestCase extends QpidTestCase
private static final Boolean BROKER_CLEAN_BETWEEN_TESTS = Boolean.getBoolean("broker.clean.between.tests");
private static final Boolean BROKER_PERSISTENT = Boolean.getBoolean("broker.persistent");
private static final Protocol BROKER_PROTOCOL =
- Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "v0_9").substring(1));
+ Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "0-9-1").replace('-', '_').replace('.', '_'));
private static List<BrokerHolder> _brokerList = new ArrayList<>();
private final Map<String, String> _propertiesSetForBroker = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/7] qpid-broker-j git commit: QPID-6933: [System Tests] Add module
for JMS 1.1 system tests and start moving JMS 1.1 tests into it
Posted by or...@apache.org.
QPID-6933: [System Tests] Add module for JMS 1.1 system tests and start moving JMS 1.1 tests into it
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/59218fdc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/59218fdc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/59218fdc
Branch: refs/heads/master
Commit: 59218fdc3eeaa6ef1c7f535bcbb29edbbc9cb962
Parents: 7336b52
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Nov 28 07:36:42 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Nov 28 07:37:08 2017 +0000
----------------------------------------------------------------------
pom.xml | 1 +
systests/qpid-systests-jms_1.1/pom.xml | 117 +++
.../qpid/systests/jms_1_1/Jms1TestBase.java | 147 ++++
.../src/main/resources/config-jms1-tests.json | 98 +++
.../jms_1_1/topic/DurableSubscribtionTest.java | 881 +++++++++++++++++++
.../test/unit/ct/DurableSubscriberTest.java | 426 ---------
.../unit/topic/DurableSubscriptionTest.java | 425 +--------
test-profiles/CPPTransientExcludes | 3 -
test-profiles/JavaPre010Excludes | 3 -
9 files changed, 1252 insertions(+), 849 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fe6f46..b53c5ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,6 +193,7 @@
<module>qpid-test-utils</module>
<module>systests</module>
<module>systests/systests-utils</module>
+ <module>systests/qpid-systests-jms_1.1</module>
<module>systests/qpid-systests-jms_2.0</module>
<module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-0-8</module>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/qpid-systests-jms_1.1/pom.xml
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/pom.xml b/systests/qpid-systests-jms_1.1/pom.xml
new file mode 100644
index 0000000..a4b455d
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>qpid-systests-jms_1.1</artifactId>
+ <name>Apache Qpid Broker-J JMS 1.1 System Tests</name>
+ <description>JMS 1.1 system tests</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <scope>test</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-utils</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>addQpidJmsClientIfNecessary</id>
+ <activation>
+ <property>
+ <name>!enableAmqp0-x</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-jms-client</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>excludesTestsIfNotAmqp1-0</id>
+ <activation>
+ <property>
+ <name>enableAmqp0-x</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.initialConfigurationLocation>classpath:config-jms1-tests.json</qpid.initialConfigurationLocation>
+ <qpid.amqp.version>${profile.broker.version}</qpid.amqp.version>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/qpid-systests-jms_1.1/src/main/java/org/apache/qpid/systests/jms_1_1/Jms1TestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/main/java/org/apache/qpid/systests/jms_1_1/Jms1TestBase.java b/systests/qpid-systests-jms_1.1/src/main/java/org/apache/qpid/systests/jms_1_1/Jms1TestBase.java
new file mode 100644
index 0000000..a19bc08
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/main/java/org/apache/qpid/systests/jms_1_1/Jms1TestBase.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import org.apache.qpid.test.utils.AmqpManagementFacade;
+import org.apache.qpid.test.utils.ConnectionBuilder;
+import org.apache.qpid.test.utils.JmsProvider;
+import org.apache.qpid.test.utils.QpidJmsClient0xProvider;
+import org.apache.qpid.test.utils.QpidJmsClientProvider;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public abstract class Jms1TestBase extends BrokerAdminUsingTestBase
+{
+ private static JmsProvider _jmsProvider;
+ private static AmqpManagementFacade _managementFacade;
+
+ @Rule
+ public final TestName _testName = new TestName();
+
+ @BeforeClass
+ public static void setUpTestBase()
+ {
+ if ("1.0".equals(System.getProperty("broker.version", "1.0")))
+ {
+ _managementFacade = new AmqpManagementFacade("$management");
+ _jmsProvider = new QpidJmsClientProvider(_managementFacade);
+ }
+ else
+ {
+ _managementFacade = new AmqpManagementFacade("ADDR:$management");
+ _jmsProvider = new QpidJmsClient0xProvider(_managementFacade);
+ }
+ }
+
+ protected ConnectionBuilder getConnectionBuilder()
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ return _jmsProvider.getConnectionBuilder()
+ .setHost(brokerAddress.getHostName())
+ .setPort(brokerAddress.getPort())
+ .setUsername(getBrokerAdmin().getValidUsername())
+ .setPassword(getBrokerAdmin().getValidPassword())
+ ;
+ }
+
+ protected void createEntityUsingAmqpManagement(final String entityName,
+ final String entityType,
+ final Map<String, Object> attributes)
+ throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _managementFacade.createEntityUsingAmqpManagement(entityName, session, entityType, attributes);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ protected Object performOperationUsingAmqpManagement(final String name,
+ final String operation,
+ final String type,
+ Map<String, Object> arguments)
+ throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ return _managementFacade.performOperationUsingAmqpManagement(name, operation, session, type, arguments);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ protected Connection getConnection() throws JMSException, NamingException
+ {
+ return getConnectionBuilder().build();
+ }
+
+ protected long getReceiveTimeout()
+ {
+ return Long.getLong("qpid.test_receive_timeout", 1000L);
+ }
+
+ protected String getVirtualHostName()
+ {
+ return getClass().getSimpleName() + "_" + _testName.getMethodName();
+ }
+
+ protected String getTestName()
+ {
+ return _testName.getMethodName();
+ }
+
+ protected Topic createTopic(final String topicName) throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ return _jmsProvider.createTopic(connection, topicName);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/qpid-systests-jms_1.1/src/main/resources/config-jms1-tests.json
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/main/resources/config-jms1-tests.json b/systests/qpid-systests-jms_1.1/src/main/resources/config-jms1-tests.json
new file mode 100644
index 0000000..a578f3b
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/main/resources/config-jms1-tests.json
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "secureOnlyMechanisms" : [],
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1", "AMQP_0_10", "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1", "AMQP_0_10", "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQPWS",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "transports" : ["WS"],
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ } ],
+ "virtualhostnodes" : []
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
new file mode 100644
index 0000000..6fb54cc
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
@@ -0,0 +1,881 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systests.jms_1_1.topic;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.jms_1_1.Jms1TestBase;
+
+public class DurableSubscribtionTest extends Jms1TestBase
+{
+ @Test
+ public void publishedMessagesAreSavedAfterSubscriberClose() throws Exception
+ {
+ Topic topic = createTopic(getTestName());
+ String subscriptionName = getTestName() + "_sub";
+ String clientId = "testClientId";
+
+ TopicConnection connection = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ Session durableSubscriberSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber durableSubscriber =
+ durableSubscriberSession.createDurableSubscriber(topic, subscriptionName);
+
+ connection.start();
+
+ producer.send(producerSession.createTextMessage("A"));
+
+ Message message = durableSubscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("A", ((TextMessage) message).getText());
+
+ durableSubscriberSession.commit();
+
+ producer.send(producerSession.createTextMessage("B"));
+
+ message = durableSubscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("B", ((TextMessage) message).getText());
+
+ durableSubscriberSession.rollback();
+
+ durableSubscriber.close();
+ durableSubscriberSession.close();
+
+ producer.send(producerSession.createTextMessage("C"));
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ if (getBrokerAdmin().supportsRestart())
+ {
+ getBrokerAdmin().restart();
+ }
+
+ TopicConnection connection2 = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ connection2.start();
+ final Session durableSubscriberSession = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ final TopicSubscriber durableSubscriber =
+ durableSubscriberSession.createDurableSubscriber(topic, subscriptionName);
+
+ final List<String> expectedMessages = Arrays.asList("B", "C");
+ for (String expectedMessageText : expectedMessages)
+ {
+ final Message message = durableSubscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals(expectedMessageText, ((TextMessage) message).getText());
+
+ durableSubscriberSession.commit();
+ }
+
+ durableSubscriber.close();
+ durableSubscriberSession.unsubscribe(subscriptionName);
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void testUnsubscribe() throws Exception
+ {
+ Topic topic = createTopic(getTestName());
+ String subscriptionName = getTestName() + "_sub";
+ String clientId = "clientId";
+ int numberOfQueuesBeforeTest = getQueueCount();
+
+ Connection connection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session durableSubscriberSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Session nonDurableSubscriberSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer subscriber = nonDurableSubscriberSession.createConsumer(topic);
+ MessageProducer producer = producerSession.createProducer(topic);
+ TopicSubscriber durableSubscriber =
+ durableSubscriberSession.createDurableSubscriber(topic, subscriptionName);
+
+ connection.start();
+ producer.send(nonDurableSubscriberSession.createTextMessage("A"));
+
+ Message message = subscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("A", ((TextMessage) message).getText());
+
+ message = durableSubscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("A", ((TextMessage) message).getText());
+
+ nonDurableSubscriberSession.commit();
+ durableSubscriberSession.commit();
+
+ durableSubscriber.close();
+ durableSubscriberSession.unsubscribe(subscriptionName);
+
+ producer.send(nonDurableSubscriberSession.createTextMessage("B"));
+
+ Session durableSubscriberSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber durableSubscriber2 =
+ durableSubscriberSession2.createDurableSubscriber(topic, subscriptionName);
+
+ producer.send(nonDurableSubscriberSession.createTextMessage("C"));
+
+ message = subscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("B", ((TextMessage) message).getText());
+
+ message = subscriber.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("C", ((TextMessage) message).getText());
+
+ message = durableSubscriber2.receive(getReceiveTimeout());
+ assertTrue(message instanceof TextMessage);
+ assertEquals("C", ((TextMessage) message).getText());
+
+ nonDurableSubscriberSession.commit();
+ durableSubscriberSession2.commit();
+
+ assertEquals("Message count should be 0", 0, getTotalDepthOfQueuesMessages());
+
+ durableSubscriber2.close();
+ durableSubscriberSession2.unsubscribe(subscriptionName);
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ int numberOfQueuesAfterTest = getQueueCount();
+ assertEquals("Unexpected number of queues", numberOfQueuesBeforeTest, numberOfQueuesAfterTest);
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with no message selector
+ * <li>try to create another durable with the same name, should fail
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ @Test
+ public void multipleSubscribersWithTheSameName() throws Exception
+ {
+ String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(subscriptionName);
+ Connection conn = getConnection();
+ try
+ {
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // create and register a durable subscriber with no message selector
+ session.createDurableSubscriber(topic, subscriptionName, null, false);
+
+ // try to recreate the durable subscriber
+ try
+ {
+ session.createDurableSubscriber(topic, subscriptionName, null, false);
+ fail("Subscription should not have been created");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ public void testDurableSubscribeWithTemporaryTopic() throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ Session ssn = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = ssn.createTemporaryTopic();
+ try
+ {
+ ssn.createDurableSubscriber(topic, "test");
+ fail("expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // this is expected
+ }
+ try
+ {
+ ssn.createDurableSubscriber(topic, "test", null, false);
+ fail("expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // this is expected
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void noLocalMessagesNotDelivered() throws Exception
+ {
+ String noLocalSubscriptionName = getTestName() + "_no_local_sub";
+ Topic topic = createTopic(getTestName());
+ String clientId = "testClientId";
+
+ Connection connection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer sessionProducer = session.createProducer(topic);
+
+ Connection noLocalConnection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session noLocalSession = noLocalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer noLocalSessionProducer = noLocalSession.createProducer(topic);
+
+ TopicSubscriber noLocalSubscriber =
+ noLocalSession.createDurableSubscriber(topic, noLocalSubscriptionName, null, true);
+ noLocalConnection.start();
+ connection.start();
+
+ noLocalSessionProducer.send(noLocalSession.createTextMessage("Message1"));
+ noLocalSession.commit();
+ sessionProducer.send(session.createTextMessage("Message2"));
+ session.commit();
+
+ Message durableSubscriberMessage = noLocalSubscriber.receive(getReceiveTimeout());
+ assertTrue(durableSubscriberMessage instanceof TextMessage);
+ assertEquals("Unexpected local message received",
+ "Message2",
+ ((TextMessage) durableSubscriberMessage).getText());
+ noLocalSession.commit();
+ }
+ finally
+ {
+ noLocalConnection.close();
+ }
+
+ Connection noLocalConnection2 = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session noLocalSession = noLocalConnection2.createSession(true, Session.SESSION_TRANSACTED);
+ noLocalConnection2.start();
+ TopicSubscriber noLocalSubscriber =
+ noLocalSession.createDurableSubscriber(topic, noLocalSubscriptionName, null, true);
+ try
+ {
+ sessionProducer.send(session.createTextMessage("Message3"));
+ session.commit();
+
+ final Message durableSubscriberMessage = noLocalSubscriber.receive(getReceiveTimeout());
+ assertTrue(durableSubscriberMessage instanceof TextMessage);
+ assertEquals("Unexpected local message received",
+ "Message3",
+ ((TextMessage) durableSubscriberMessage).getText());
+ noLocalSession.commit();
+ }
+ finally
+ {
+ noLocalSubscriber.close();
+ noLocalSession.unsubscribe(noLocalSubscriptionName);
+ }
+ }
+ finally
+ {
+ noLocalConnection2.close();
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ /**
+ * Tests that messages are delivered normally to a subscriber on a separate connection despite
+ * the use of durable subscriber with no-local on the first connection.
+ */
+ @Test
+ public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception
+ {
+ String noLocalSubscriptionName = getTestName() + "_no_local_sub";
+ String subscriobtionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ final String clientId = "clientId";
+
+ Connection noLocalConnection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Connection connection = getConnection();
+ try
+ {
+ Session noLocalSession = noLocalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer noLocalSessionProducer = noLocalSession.createProducer(topic);
+ MessageProducer sessionProducer = session.createProducer(topic);
+
+ try
+ {
+ TopicSubscriber noLocalSubscriber =
+ noLocalSession.createDurableSubscriber(topic, noLocalSubscriptionName, null, true);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriobtionName, null, false);
+ noLocalConnection.start();
+ connection.start();
+
+ noLocalSessionProducer.send(noLocalSession.createTextMessage("Message1"));
+ noLocalSession.commit();
+ sessionProducer.send(session.createTextMessage("Message2"));
+ sessionProducer.send(session.createTextMessage("Message3"));
+ session.commit();
+
+ Message durableSubscriberMessage = noLocalSubscriber.receive(getReceiveTimeout());
+ assertTrue(durableSubscriberMessage instanceof TextMessage);
+ assertEquals("Unexpected local message received",
+ "Message2",
+ ((TextMessage) durableSubscriberMessage).getText());
+ noLocalSession.commit();
+
+ Message nonDurableSubscriberMessage = subscriber.receive(getReceiveTimeout());
+ assertTrue(nonDurableSubscriberMessage instanceof TextMessage);
+ assertEquals("Unexpected message received",
+ "Message1",
+ ((TextMessage) nonDurableSubscriberMessage).getText());
+
+ session.commit();
+ noLocalSubscriber.close();
+ subscriber.close();
+ }
+ finally
+ {
+ noLocalSession.unsubscribe(noLocalSubscriptionName);
+ session.unsubscribe(subscriobtionName);
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+ finally
+ {
+ noLocalConnection.close();
+ }
+ }
+
+ /**
+ * create and register a durable subscriber with a message selector and then close it
+ * crash the broker
+ * create a publisher and send 5 right messages and 5 wrong messages
+ * recreate the durable subscriber and check we receive the 5 expected messages
+ */
+ @Test
+ public void testMessageSelectorRecoveredOnBrokerRestart() throws Exception
+ {
+ assumeThat(getBrokerAdmin().supportsRestart(), is(true));
+
+ final Topic topic = createTopic(getTestName());
+
+ String clientId = "testClientId";
+ String subscriptionName = getTestName() + "_sub";
+ TopicConnection subscriberConnection =
+ (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, subscriptionName, "testprop='true'", false);
+ subscriberConnection.start();
+ subscriber.close();
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection.close();
+ }
+
+ getBrokerAdmin().restart();
+
+ TopicConnection connection = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = session.createPublisher(topic);
+ for (int i = 0; i < 10; i++)
+ {
+ Message message = session.createMessage();
+ message.setStringProperty("testprop", String.valueOf(i % 2 == 0));
+ publisher.publish(message);
+ }
+ publisher.close();
+ session.close();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ TopicConnection subscriberConnection2 =
+ (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = subscriberConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, subscriptionName, "testprop='true'", false);
+ subscriberConnection2.start();
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = subscriber.receive(1000);
+ if (message == null)
+ {
+ fail(String.format("Message '%d' was received", i));
+ }
+ else
+ {
+ assertTrue(String.format("Received message %d with not matching selector", i),
+ message.getStringProperty("testprop").equals("true"));
+ }
+ }
+ subscriber.close();
+ session.unsubscribe(subscriptionName);
+ }
+ finally
+ {
+ subscriberConnection2.close();
+ }
+ }
+
+ /**
+ * create and register a durable subscriber without a message selector and then unsubscribe it
+ * create and register a durable subscriber with a message selector and then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber with a message selector
+ * verify only the matching messages are received
+ */
+ @Test
+ public void testChangeSubscriberToHaveSelector() throws Exception
+ {
+ assumeThat(getBrokerAdmin().supportsRestart(), is(true));
+
+ final String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ String testClientId = "testClientId";
+
+ TopicConnection subscriberConnection =
+ (TopicConnection) getConnectionBuilder().setClientId(testClientId).build();
+ try
+ {
+ TopicSession session = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriptionName);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+ publisher.send(session.createTextMessage("Message1"));
+ publisher.send(session.createTextMessage("Message2"));
+
+ subscriberConnection.start();
+ Message receivedMessage = subscriber.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message1", ((TextMessage) receivedMessage).getText());
+
+ subscriber.close();
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection.close();
+ }
+
+ //create and register a durable subscriber with a message selector and then close it
+ TopicConnection subscriberConnection2 =
+ (TopicConnection) getConnectionBuilder().setClientId(testClientId).build();
+ try
+ {
+ TopicSession session = subscriberConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, subscriptionName, "testprop='true'", false);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+ TextMessage message = session.createTextMessage("Message3");
+ message.setStringProperty("testprop", "false");
+ publisher.send(message);
+ message = session.createTextMessage("Message4");
+ message.setStringProperty("testprop", "true");
+ publisher.send(message);
+
+ subscriberConnection2.start();
+
+ Message receivedMessage = subscriber.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message4", ((TextMessage) receivedMessage).getText());
+
+ subscriber.close();
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection2.close();
+ }
+
+ getBrokerAdmin().restart();
+
+ TopicConnection publisherConnection = (TopicConnection) getConnection();
+ try
+ {
+ TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = session.createPublisher(topic);
+ for (int i = 0; i < 10; i++)
+ {
+ Message message = session.createMessage();
+ message.setStringProperty("testprop", String.valueOf(i % 2 == 0));
+ publisher.publish(message);
+ }
+ publisher.close();
+ session.close();
+ }
+ finally
+ {
+ publisherConnection.close();
+ }
+
+ TopicConnection subscriberConnection3 =
+ (TopicConnection) getConnectionBuilder().setClientId(testClientId).build();
+ try
+ {
+ TopicSession session = (TopicSession) subscriberConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, subscriptionName, "testprop='true'", false);
+ subscriberConnection3.start();
+
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = subscriber.receive(2000);
+ if (message == null)
+ {
+ fail(String.format("Message '%d' was not received", i));
+ }
+ else
+ {
+ assertTrue(String.format("Received message %d with not matching selector", i),
+ message.getStringProperty("testprop").equals("true"));
+ }
+ }
+
+ subscriber.close();
+ session.unsubscribe(subscriptionName);
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection3.close();
+ }
+ }
+
+
+ /**
+ * create and register a durable subscriber with a message selector and then unsubscribe it
+ * create and register a durable subscriber without a message selector and then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber without a message selector
+ * verify ALL the sent messages are received
+ */
+ @Test
+ public void testChangeSubscriberToHaveNoSelector() throws Exception
+ {
+ assumeThat(getBrokerAdmin().supportsRestart(), is(true));
+
+ final String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ String clientId = "testClientId";
+
+ //create and register a durable subscriber with selector then unsubscribe it
+ TopicConnection durConnection = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, subscriptionName, "testprop='true'", false);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+ TextMessage message = session.createTextMessage("Messag1");
+ message.setStringProperty("testprop", "false");
+ publisher.send(message);
+ message = session.createTextMessage("Message2");
+ message.setStringProperty("testprop", "true");
+ publisher.send(message);
+
+ message = session.createTextMessage("Message3");
+ message.setStringProperty("testprop", "true");
+ publisher.send(message);
+
+ durConnection.start();
+
+ Message receivedMessage = subscriber.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message2", ((TextMessage) receivedMessage).getText());
+
+ subscriber.close();
+ session.close();
+ }
+ finally
+ {
+ durConnection.close();
+ }
+
+ //create and register a durable subscriber without the message selector and then close it
+ TopicConnection subscriberConnection2 =
+ (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = subscriberConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriptionName);
+ subscriberConnection2.start();
+ subscriber.close();
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection2.close();
+ }
+
+ //send messages matching and not matching the original used selector
+ TopicConnection publisherConnection = (TopicConnection) getConnection();
+ try
+ {
+ TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = session.createPublisher(topic);
+ for (int i = 1; i <= 10; i++)
+ {
+ Message message = session.createMessage();
+ message.setStringProperty("testprop", String.valueOf(i % 2 == 0));
+ publisher.publish(message);
+ }
+ publisher.close();
+ session.close();
+ }
+ finally
+ {
+ publisherConnection.close();
+ }
+
+ getBrokerAdmin().restart();
+
+ TopicConnection subscriberConnection3 =
+ (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ TopicSession session = (TopicSession) subscriberConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriptionName);
+ subscriberConnection3.start();
+
+ for (int i = 1; i <= 10; i++)
+ {
+ Message message = subscriber.receive(2000);
+ if (message == null)
+ {
+ fail(String.format("Message %d was not received", i));
+ }
+ }
+
+ subscriber.close();
+ session.unsubscribe(subscriptionName);
+ session.close();
+ }
+ finally
+ {
+ subscriberConnection3.close();
+ }
+ }
+
+ @Test
+ public void testResubscribeWithChangedSelector() throws Exception
+ {
+ assumeThat(getBrokerAdmin().supportsRestart(), is(true));
+
+ String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ String clientId = "testClientId";
+
+ TopicConnection connection = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(topic);
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subscriberA =
+ session.createDurableSubscriber(topic, subscriptionName, "Match = True", false);
+
+ // Send 1 non-matching message and 1 matching message
+ TextMessage message = session.createTextMessage("Message1");
+ message.setBooleanProperty("Match", false);
+ producer.send(message);
+ message = session.createTextMessage("Message2");
+ message.setBooleanProperty("Match", true);
+ producer.send(message);
+
+ Message receivedMessage = subscriberA.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message2", ((TextMessage) receivedMessage).getText());
+
+ // Send another 1 matching message and 1 non-matching message
+ message = session.createTextMessage("Message3");
+ message.setBooleanProperty("Match", true);
+ producer.send(message);
+ message = session.createTextMessage("Message4");
+ message.setBooleanProperty("Match", false);
+ producer.send(message);
+
+ // Disconnect subscriber without receiving the message to
+ //leave it on the underlying queue
+ subscriberA.close();
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subscriberB = session.createDurableSubscriber(topic,
+ subscriptionName,
+ "Match = False", false);
+
+ // Check that new messages are received properly
+ message = session.createTextMessage("Message5");
+ message.setBooleanProperty("Match", true);
+ producer.send(message);
+ message = session.createTextMessage("Message6");
+ message.setBooleanProperty("Match", false);
+ producer.send(message);
+
+ // changing the selector should have cleared the queue so we expect message 6 instead of message 4
+ receivedMessage = subscriberB.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message6", ((TextMessage) receivedMessage).getText());
+
+ // publish a message to be consumed after restart
+ message = session.createTextMessage("Message7");
+ message.setBooleanProperty("Match", true);
+ producer.send(message);
+ message = session.createTextMessage("Message8");
+ message.setBooleanProperty("Match", false);
+ producer.send(message);
+ session.close();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ //now restart the server
+ getBrokerAdmin().restart();
+
+ // Reconnect to broker
+ TopicConnection connection2 = (TopicConnection) getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ connection2.start();
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subscriberC =
+ session.createDurableSubscriber(topic, subscriptionName, "Match = False", false);
+
+ //check the dur sub's underlying queue now has msg count 1
+ Message receivedMessage = subscriberC.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message content", "Message8", ((TextMessage) receivedMessage).getText());
+
+ subscriberC.close();
+ session.unsubscribe(subscriptionName);
+
+ session.close();
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ private int getQueueCount() throws Exception
+ {
+ Map<String, Object> statisticsMap = getVirtualHostStatistics("queueCount");
+ return ((Number) statisticsMap.get("queueCount")).intValue();
+ }
+
+ private long getTotalDepthOfQueuesMessages() throws Exception
+ {
+ Map<String, Object> statisticsMap = getVirtualHostStatistics("totalDepthOfQueuesMessages");
+ return ((Number) statisticsMap.get("totalDepthOfQueuesMessages")).intValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> getVirtualHostStatistics(final String... statisticsName) throws Exception
+ {
+ Map<String, Object> arguments = Collections.singletonMap("statistics", Arrays.asList(statisticsName));
+ Object statistics = performOperationUsingAmqpManagement(getVirtualHostName(),
+ "getStatistics",
+ "org.apache.qpid.VirtualHost",
+ arguments);
+
+ assertNotNull("Statistics is null", statistics);
+ assertTrue("Statistics is not map", statistics instanceof Map);
+
+ return (Map<String, Object>) statistics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/59218fdc/systests/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/systests/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
deleted file mode 100644
index 7282d41..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.test.unit.ct;
-
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * Crash Recovery tests for durable subscription
- *
- */
-public class DurableSubscriberTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(DurableSubscriberTest.class);
- private final String _topicName = "durableSubscriberTopic";
-
- /**
- * create and register a durable subscriber with a message selector and then close it
- * crash the broker
- * create a publisher and send 5 right messages and 5 wrong messages
- * recreate the durable subscriber and check we receive the 5 expected messages
- */
- public void testDurSubRestoresMessageSelector() throws Exception
- {
- if (isBrokerStorePersistent())
- {
- //create and register a durable subscriber with a message selector and then close it
- TopicConnection durConnection = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- final Topic topic = createTopic(durConnection, _topicName);
- TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
- durConnection.start();
- durSub1.close();
- durSession.close();
- durConnection.stop();
- //now stop the server
- try
- {
- restartDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("problems restarting broker: " + e);
- throw e;
- }
- TopicConnection pubConnection = (TopicConnection) getConnection();
- TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicPublisher publisher = pubSession.createPublisher(topic);
- for (int i = 0; i < 5; i++)
- {
- Message message = pubSession.createMessage();
- message.setStringProperty("testprop", "true");
- publisher.publish(message);
- message = pubSession.createMessage();
- message.setStringProperty("testprop", "false");
- publisher.publish(message);
- }
- publisher.close();
- pubSession.close();
-
- //now recreate the durable subscriber and check the received messages
- TopicConnection durConnection2 = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
- durConnection2.start();
- for (int i = 0; i < 5; i++)
- {
- Message message = durSub2.receive(1000);
- if (message == null)
- {
- assertTrue("testDurSubRestoresMessageSelector test failed. no message was returned", false);
- }
- else
- {
- assertTrue("testDurSubRestoresMessageSelector test failed. message selector not reset",
- message.getStringProperty("testprop").equals("true"));
- }
- }
- durSub2.close();
- durSession2.unsubscribe("dursub");
- durConnection2.close();
- }
- }
-
- /**
- * create and register a durable subscriber without a message selector and then unsubscribe it
- * create and register a durable subscriber with a message selector and then close it
- * restart the broker
- * send matching and non matching messages
- * recreate and register the durable subscriber with a message selector
- * verify only the matching messages are received
- */
- public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
- {
- if (! isBrokerStorePersistent())
- {
- LOGGER.warn("Test skipped due to requirement of a persistent store");
- return;
- }
-
- final String SUB_NAME=getTestQueueName();
-
- //create and register a durable subscriber then unsubscribe it
- TopicConnection durConnection = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- Topic topic = createTopic(durConnection, _topicName);
- TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME);
- durConnection.start();
- durSub1.close();
- durSession.unsubscribe(SUB_NAME);
- durSession.close();
- durConnection.close();
-
- //create and register a durable subscriber with a message selector and then close it
- TopicConnection durConnection2 = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
- durConnection2.start();
- durSub2.close();
- durSession2.close();
- durConnection2.close();
-
- //now restart the server
- try
- {
- restartDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("problems restarting broker: " + e);
- throw e;
- }
-
- //send messages matching and not matching the selector
- TopicConnection pubConnection = (TopicConnection) getConnection();
- TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicPublisher publisher = pubSession.createPublisher(topic);
- for (int i = 0; i < 5; i++)
- {
- Message message = pubSession.createMessage();
- message.setStringProperty("testprop", "true");
- publisher.publish(message);
- message = pubSession.createMessage();
- message.setStringProperty("testprop", "false");
- publisher.publish(message);
- }
- publisher.close();
- pubSession.close();
-
- //now recreate the durable subscriber with selector to check there are no exceptions generated
- //and then verify the messages are received correctly
- TopicConnection durConnection3 = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
- durConnection3.start();
-
- for (int i = 0; i < 5; i++)
- {
- Message message = durSub3.receive(2000);
- if (message == null)
- {
- fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned");
- }
- else
- {
- assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector",
- message.getStringProperty("testprop").equals("true"));
- }
- }
-
- durSub3.close();
- durSession3.unsubscribe(SUB_NAME);
- durSession3.close();
- durConnection3.close();
- }
-
-
- /**
- * create and register a durable subscriber with a message selector and then unsubscribe it
- * create and register a durable subscriber without a message selector and then close it
- * restart the broker
- * send matching and non matching messages
- * recreate and register the durable subscriber without a message selector
- * verify ALL the sent messages are received
- */
- public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception
- {
- if (! isBrokerStorePersistent())
- {
- LOGGER.warn("Test skipped due to requirement of a persistent store");
- return;
- }
-
- final String SUB_NAME=getTestQueueName();
-
- //create and register a durable subscriber with selector then unsubscribe it
- TopicConnection durConnection = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- Topic topic = createTopic(durConnection, _topicName);
- TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
- durConnection.start();
- durSub1.close();
- durSession.unsubscribe(SUB_NAME);
- durSession.close();
- durConnection.close();
-
- //create and register a durable subscriber without the message selector and then close it
- TopicConnection durConnection2 = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME);
- durConnection2.start();
- durSub2.close();
- durSession2.close();
- durConnection2.close();
-
- //now restart the server
- try
- {
- restartDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("problems restarting broker: " + e);
- throw e;
- }
-
- //send messages matching and not matching the original used selector
- TopicConnection pubConnection = (TopicConnection) getConnection();
- TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicPublisher publisher = pubSession.createPublisher(topic);
- for (int i = 1; i <= 5; i++)
- {
- Message message = pubSession.createMessage();
- message.setStringProperty("testprop", "true");
- publisher.publish(message);
- message = pubSession.createMessage();
- message.setStringProperty("testprop", "false");
- publisher.publish(message);
- }
- publisher.close();
- pubSession.close();
-
- //now recreate the durable subscriber without selector to check there are no exceptions generated
- //then verify ALL messages sent are received
- TopicConnection durConnection3 = (TopicConnection) getConnectionBuilder().setClientId("testClientId").build();
- TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME);
- durConnection3.start();
-
- for (int i = 1; i <= 10; i++)
- {
- Message message = durSub3.receive(2000);
- if (message == null)
- {
- fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received");
- }
- }
-
- durSub3.close();
- durSession3.unsubscribe(SUB_NAME);
- durSession3.close();
- durConnection3.close();
- }
-
-
- public void testResubscribeWithChangedSelectorAndRestart() throws Exception
- {
- if (! isBrokerStorePersistent())
- {
- LOGGER.warn("Test skipped due to requirement of a persistent store");
- return;
- }
-
- TopicConnection conn = (TopicConnection) getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "testResubscribeWithChangedSelectorAndRestart");
- MessageProducer producer = session.createProducer(topic);
-
- // Create durable subscriber that matches A
- TopicSubscriber subA = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorAndRestart",
- "Match = True", false);
-
- // Send 1 matching message and 1 non-matching message
- TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
-
- Message rMsg = subA.receive(getReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelectorAndRestart1",
- ((TextMessage) rMsg).getText());
-
- // Queue has no messages left
- rMsg = subA.receive(getReceiveTimeout());
- assertNull(rMsg);
-
- // Send another 1 matching message and 1 non-matching message
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
-
- // Disconnect subscriber without receiving the message to
- //leave it on the underlying queue
- subA.close();
-
- // Reconnect with new selector that matches B
- TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorAndRestart",
- "Match = false", false);
-
- //verify no messages are now present on the queue as changing selector should have issued
- //an unsubscribe and thus deleted the previous durable backing queue for the subscription.
- //check the dur sub's underlying queue now has msg count 0
- rMsg = subB.receive(getReceiveTimeout());
- assertNull(rMsg);
-
- // Check that new messages are received properly
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
-
- rMsg = subB.receive(getReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelectorAndRestart2",
- ((TextMessage) rMsg).getText());
-
- //check the dur sub's underlying queue now has msg count 0
- rMsg = subB.receive(getReceiveTimeout());
- assertNull(rMsg);
-
- conn.close();
-
- //now restart the server
- try
- {
- restartDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("problems restarting broker: " + e);
- throw e;
- }
-
- // Reconnect to broker
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = createTopic(connection, "testResubscribeWithChangedSelectorAndRestart");
- producer = session.createProducer(topic);
-
- // Reconnect with new selector that matches B
- TopicSubscriber subC = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorAndRestart",
- "Match = False", false);
-
- //verify no messages now present on the queue after we restart the broker
- //check the dur sub's underlying queue now has msg count 0
- rMsg = subC.receive(getReceiveTimeout());
- assertNull(rMsg);
-
- // Check that new messages are still sent and recieved properly
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
-
- //check the dur sub's underlying queue now has msg count 1
- rMsg = subC.receive(getReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelectorAndRestart2",
- ((TextMessage) rMsg).getText());
-
- rMsg = subC.receive(getReceiveTimeout());
- assertNull(rMsg);
-
- subC.close();
- session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
-
- session.close();
- connection.close();
- }
-}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org