You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/11 16:29:12 UTC
[6/6] qpid-broker-j git commit: QPID-8158: [Broker-J] [System Tests]
Refactor remaining system tests extending QpidBrokerTestCase
QPID-8158: [Broker-J] [System Tests] Refactor remaining system tests extending QpidBrokerTestCase
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3a6893e4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3a6893e4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3a6893e4
Branch: refs/heads/master
Commit: 3a6893e464ec320a7bf4d8ab988c914be8b8f8d8
Parents: 7f0219c
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Apr 11 15:47:34 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Apr 11 17:27:23 2018 +0100
----------------------------------------------------------------------
bdbstore/systests/pom.xml | 7 +-
.../berkeleydb/BDBAMQP10V0UpgradeTest.java | 117 ++--
.../server/store/berkeleydb/BDBBackupTest.java | 147 -----
.../server/store/berkeleydb/BDBUpgradeTest.java | 558 +++++++++----------
.../store/berkeleydb/UpgradeTestBase.java | 88 +++
.../BDBHAVirtualHostNodeRestTest.java | 471 ----------------
.../replication/BDBHAVirtualHostRestTest.java | 168 ------
.../berkeleydb/replication/GroupCreator.java | 514 -----------------
.../replication/GroupJmsTestBase.java | 9 +
pom.xml | 6 +
qpid-perftests-systests/pom.xml | 20 +-
.../systest/disttest/endtoend/EndToEndTest.java | 176 +++---
.../disttest/perftests.systests.properties | 29 -
.../apache/qpid/tests/http/HttpTestBase.java | 2 +
.../qpid/systests/AmqpManagementFacade.java | 54 +-
.../apache/qpid/systests/ConnectionBuilder.java | 1 +
.../org/apache/qpid/systests/JmsTestBase.java | 4 +-
.../QpidJmsClient0xConnectionBuilder.java | 37 +-
.../QpidJmsClientConnectionBuilder.java | 33 +-
.../qpid/systests/admin/SpawnBrokerAdmin.java | 19 +-
.../src/main/resources/spawn-broker.json | 3 +
21 files changed, 639 insertions(+), 1824 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/pom.xml
----------------------------------------------------------------------
diff --git a/bdbstore/systests/pom.xml b/bdbstore/systests/pom.xml
index eefb5bc..b81f3b1 100644
--- a/bdbstore/systests/pom.xml
+++ b/bdbstore/systests/pom.xml
@@ -57,7 +57,12 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests-spawn-admin</artifactId>
- <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-jms-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
index 4d3d9bd..2d8fd16 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
@@ -19,10 +19,22 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.apache.qpid.systests.JmsTestBase.DEFAULT_BROKER_CONFIG;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Files;
import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -32,11 +44,16 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.xml.bind.DatatypeConverter;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.tests.utils.ConfigItem;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
/**
*
@@ -51,77 +68,61 @@ import org.apache.qpid.server.util.FileUtils;
* messageProducer.send(message);
*
*/
-public class BDBAMQP10V0UpgradeTest extends QpidBrokerTestCase
+@ConfigItem(name = "qpid.initialConfigurationLocation", value = DEFAULT_BROKER_CONFIG )
+public class BDBAMQP10V0UpgradeTest extends UpgradeTestBase
{
- private static final int EXPECTED_MESSAGE_LENGTH = 256 * 1024;
+ private static final long EXPECTED_MESSAGE_LENGTH = 256 * 1024;
- private String _storeLocation;
-
- @Override
- public void setUp() throws Exception
+ @BeforeClass
+ public static void verifyClient()
{
- _storeLocation = Files.createTempDirectory("qpid-work-" + getClassQualifiedTestName() + "-bdb-store").toString();
- TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
- brokerConfiguration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, BDBVirtualHostNode.STORE_PATH, _storeLocation );
-
- //Clear the two target directories if they exist.
- File directory = new File(_storeLocation);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
- directory.mkdirs();
-
- // copy store files
- InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb");
- FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
-
- super.setUp();
+ assumeThat(System.getProperty("virtualhostnode.type", "BDB"), is(equalTo("BDB")));
+ assumeThat(getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
}
- @Override
- public void tearDown() throws Exception
+ @Test
+ public void testRecoverAmqpV0Message() throws Exception
{
+ Connection connection = getConnectionBuilder().setVirtualHost("test").build();
try
{
- super.tearDown();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("queue");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(getReceiveTimeout());
+ assertThat("Recovered message not received", message, is(instanceOf(BytesMessage.class)));
+ BytesMessage bytesMessage = ((BytesMessage) message);
+
+ long length = bytesMessage.getBodyLength();
+ String expectedContentHash = message.getStringProperty("sha256hash");
+ byte[] content = new byte[(int) length];
+ bytesMessage.readBytes(content);
+
+ assertThat("Unexpected content length", length, is(equalTo(EXPECTED_MESSAGE_LENGTH)));
+ assertThat("Message should carry expectedShaHash property", expectedContentHash, is(notNullValue()));
+
+ String contentHash = computeContentHash(content);
+ assertThat("Unexpected content hash", expectedContentHash, is(equalTo(contentHash)));
+ session.commit();
}
finally
{
- FileUtils.delete(new File(_storeLocation), true);
+ connection.close();
}
}
- public void testRecoverAmqpV0Message() throws Exception
- {
- Connection connection = getConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = createTestQueue(session, "queue");
- MessageConsumer consumer = session.createConsumer(queue);
-
- Message message = consumer.receive(getReceiveTimeout());
- assertNotNull("Recovered message not received", message);
- assertTrue(message instanceof BytesMessage);
- BytesMessage bytesMessage = ((BytesMessage) message);
-
- long length = bytesMessage.getBodyLength();
- String expectedContentHash = message.getStringProperty("sha256hash");
- byte[] content = new byte[(int) length];
- bytesMessage.readBytes(content);
-
- assertEquals("Unexpected content length", EXPECTED_MESSAGE_LENGTH, length);
- assertNotNull("Message should carry expectedShaHash property", expectedContentHash);
-
- String contentHash = computeContentHash(content);
- assertEquals("Unexpected content hash", expectedContentHash, contentHash);
- session.commit();
- }
-
private String computeContentHash(final byte[] content) throws Exception
{
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(content);
return DatatypeConverter.printHexBinary(hash);
}
+
+ @Override
+ String getOldStoreResourcePath()
+ {
+ return "upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb";
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
deleted file mode 100644
index aac33c4..0000000
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.server.util.FileUtils;
-import org.apache.qpid.server.util.Strings;
-
-/**
- * Tests the BDB backup can successfully perform a backup and that
- * backup can be restored and used by the Broker.
- */
-public class BDBBackupTest extends QpidBrokerTestCase
-{
- protected static final Logger LOGGER = LoggerFactory.getLogger(BDBBackupTest.class);
-
- private static final String TEST_VHOST = "test";
- private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir");
-
- private File _backupToDir;
- private File _backupFromDir;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName());
- _backupToDir.mkdirs();
-
- Map<String, Object> virtualHostNodeAttributes = getDefaultBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TEST_VHOST);
- setSystemProperty("qpid.work_dir", getDefaultBroker().getWorkDir().toString());
- _backupFromDir = new File(Strings.expand((String) virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH)));
- boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
- assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- FileUtils.delete(_backupToDir, true);
- }
- }
-
- public void testBackupAndRestoreMaintainsMessages() throws Exception
- {
- sendNumberedMessages(0, 10);
- invokeBdbBackup(_backupFromDir, _backupToDir);
- sendNumberedMessages(10, 20);
- confirmBrokerHasMessages(0, 20);
- stopDefaultBroker();
-
- deleteStore(_backupFromDir);
- replaceStoreWithBackup(_backupToDir, _backupFromDir);
-
- startDefaultBroker();
- confirmBrokerHasMessages(0, 10);
- }
-
- private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception
- {
- Connection con = getConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = createTestQueue(session);
-
- final int numOfMessages = endIndex - startIndex;
- final int batchSize = 0;
- sendMessage(session, destination, numOfMessages, startIndex, batchSize);
- con.close();
- }
-
- private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception
- {
- Connection con = getConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- con.start();
- Destination destination = session.createQueue(getTestQueueName());
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = startIndex; i < endIndex; i++)
- {
- Message msg = consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message " + i + " not received", msg);
- assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
- }
-
- Message msg = consumer.receive(100);
- if(msg != null)
- {
- fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
- }
- con.close();
- }
-
- private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception
- {
- BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()});
- }
-
- private void replaceStoreWithBackup(File source, File dst) throws Exception
- {
- LOGGER.debug("Copying store " + source + " to " + dst);
- FileUtils.copyRecursive(source, dst);
- }
-
- private void deleteStore(File storeDir)
- {
- LOGGER.debug("Deleting store " + storeDir);
- FileUtils.delete(storeDir, true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index bf33786..5de003e 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -20,20 +20,29 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.apache.qpid.systests.JmsTestBase.DEFAULT_BROKER_CONFIG;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.apache.qpid.systests.Utils.sendMessages;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -46,83 +55,49 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.util.FileUtils;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
-import org.apache.qpid.systest.rest.RestTestHelper;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.systests.AmqpManagementFacade;
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.tests.utils.ConfigItem;
+import org.apache.qpid.tests.utils.EmbeddedBrokerPerClassAdminImpl;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
/**
* Tests upgrading a BDB store on broker startup.
* The store will then be used to verify that the upgrade is completed
* properly and that once upgraded it functions as expected.
- *
+ * <p>
* Store prepared using old client/broker with BDBStoreUpgradeTestPreparer.
*/
-public class BDBUpgradeTest extends QpidBrokerTestCase
+@ConfigItem(name = "qpid.initialConfigurationLocation", value = DEFAULT_BROKER_CONFIG )
+public class BDBUpgradeTest extends UpgradeTestBase
{
private static final String STRING_1024 = generateString(1024);
- private static final String STRING_1024_256 = generateString(1024*256);
-
- private static final String TOPIC_NAME="myUpgradeTopic";
- private static final String SUB_NAME="myDurSubName";
- private static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
- private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
- private static final String QUEUE_NAME="myUpgradeQueue";
- private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
- private static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
- private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
-
- private String _storeLocation;
- private RestTestHelper _restTestHelper;
-
- @Override
- public void setUp() throws Exception
- {
- _storeLocation = Files.createTempDirectory("qpid-work-" + getClassQualifiedTestName() + "-bdb-store").toString();
- TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
- brokerConfiguration.addHttpManagementConfiguration();
- brokerConfiguration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, BDBVirtualHostNode.STORE_PATH, _storeLocation );
-
- //Clear the two target directories if they exist.
- File directory = new File(_storeLocation);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
- directory.mkdirs();
-
- // copy store files
- InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb");
- FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
-
- super.setUp();
- _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
- }
-
- @Override
- public void tearDown() throws Exception
+ private static final String STRING_1024_256 = generateString(1024 * 256);
+
+ private static final String TOPIC_NAME = "myUpgradeTopic";
+ private static final String SUB_NAME = "myDurSubName";
+ private static final String SELECTOR_SUB_NAME = "mySelectorDurSubName";
+ private static final String SELECTOR_TOPIC_NAME = "mySelectorUpgradeTopic";
+ private static final String QUEUE_NAME = "myUpgradeQueue";
+ private static final String PRIORITY_QUEUE_NAME = "myPriorityQueue";
+ private static final String QUEUE_WITH_DLQ_NAME = "myQueueWithDLQ";
+
+ @BeforeClass
+ public static void verifyClient()
{
- try
- {
- _restTestHelper.tearDown();
- }
- finally
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- FileUtils.delete(new File(_storeLocation), true);
- }
- }
+ assumeThat(System.getProperty("virtualhostnode.type", "BDB"), is(equalTo("BDB")));
+ assumeThat(getProtocol(), is(not(equalTo(Protocol.AMQP_1_0))));
}
/**
@@ -131,93 +106,106 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
* by monitoring message count while sending new messages to the topic and then
* consuming them.
*/
+ @Test
public void testSelectorDurability() throws Exception
{
- Connection con = getConnection();
- Queue queue;
+ TopicConnection connection = getTopicConnection();
try
{
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue("BURL:direct:////clientid" + ":" + SELECTOR_SUB_NAME);
+ connection.start();
+
+ TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(SELECTOR_TOPIC_NAME);
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ int index = ThreadLocalRandom.current().nextInt();
+ Message messageA = session.createTextMessage("A");
+ messageA.setIntProperty("ID", index);
+ messageA.setStringProperty("testprop", "false");
+ publisher.publish(messageA);
+
+ Message messageB = session.createTextMessage("B");
+ messageB.setIntProperty("ID", index);
+ messageB.setStringProperty("testprop", "true");
+ publisher.publish(messageB);
+
+ session.commit();
+
+ TopicSubscriber subscriber =
+ session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
+ Message migrated = subscriber.receive(getReceiveTimeout());
+ assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
+
+ Message received = subscriber.receive(getReceiveTimeout());
+ session.commit();
+ assertThat("Failed to receive published message", received, is(notNullValue()));
+ assertThat("Message is not Text message", received, is(instanceOf(TextMessage.class)));
+ assertThat("Unexpected text", ((TextMessage) received).getText(), is(equalTo("B")));
+ assertThat("Unexpected index", received.getIntProperty("ID"), is(equalTo(index)));
+
+ session.close();
}
finally
{
- con.close();
+ connection.close();
}
-
- // Create a connection and start it
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
-
- // Send messages which don't match and do match the selector, checking message count
- TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
- assertEquals("DurableSubscription backing queue should have 1 message on it initially",
- 1, getQueueDepth(queue.getQueueName()));
-
- Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME);
- TopicPublisher publisher = pubSession.createPublisher(topic);
-
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
- pubSession.commit();
- assertEquals("DurableSubscription backing queue should still have 1 message on it",
- 1, getQueueDepth(queue.getQueueName()));
-
- publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
- pubSession.commit();
- assertEquals("DurableSubscription backing queue should now have 2 messages on it",
- 2, getQueueDepth(queue.getQueueName()));
-
- TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
- Message m = durSub.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- m = durSub.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- pubSession.commit();
-
- pubSession.close();
}
/**
* Test that the DurableSubscription without selector was successfully
* transfered to the new store, and functions as expected with continued use.
*/
+ @Test
public void testDurableSubscriptionWithoutSelector() throws Exception
{
- Connection con = getConnection();
- Queue queue;
+ TopicConnection connection = getTopicConnection();
try
{
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue("BURL:direct:////clientid" + ":" + SUB_NAME);
+ connection.start();
+
+ TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+
+ Topic topic = session.createTopic(TOPIC_NAME);
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ int index = ThreadLocalRandom.current().nextInt();
+ Message messageA = session.createTextMessage("A");
+ messageA.setIntProperty("ID", index);
+ messageA.setStringProperty("testprop", "false");
+ publisher.publish(messageA);
+
+ Message messageB = session.createTextMessage("B");
+ messageB.setIntProperty("ID", index);
+ messageB.setStringProperty("testprop", "true");
+ publisher.publish(messageB);
+
+ session.commit();
+
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, SUB_NAME);
+ Message migrated = subscriber.receive(getReceiveTimeout());
+ assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
+
+ Message receivedA = subscriber.receive(getReceiveTimeout());
+ session.commit();
+ assertThat("Failed to receive published message A", receivedA, is(notNullValue()));
+ assertThat("Message A is not Text message", receivedA, is(instanceOf(TextMessage.class)));
+ assertThat("Unexpected text for A", ((TextMessage) receivedA).getText(), is(equalTo("A")));
+ assertThat("Unexpected index", receivedA.getIntProperty("ID"), is(equalTo(index)));
+
+ Message receivedB = subscriber.receive(getReceiveTimeout());
+ session.commit();
+ assertThat("Failed to receive published message B", receivedB, is(notNullValue()));
+ assertThat("Message B is not Text message", receivedB, is(instanceOf(TextMessage.class)));
+ assertThat("Unexpected text for B", ((TextMessage) receivedB).getText(), is(equalTo("B")));
+ assertThat("Unexpected index for B", receivedB.getIntProperty("ID"), is(equalTo(index)));
+
+ session.commit();
+ session.close();
}
finally
{
- con.close();
+ connection.close();
}
- // Create a connection and start it
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
-
- // Send new message matching the topic, checking message count
- TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
- assertEquals("DurableSubscription backing queue should have 1 message on it initially",
- 1, getQueueDepth(queue.getQueueName()));
- Topic topic = session.createTopic(TOPIC_NAME);
- TopicPublisher publisher = session.createPublisher(topic);
-
- publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent");
- session.commit();
- assertEquals("DurableSubscription backing queue should now have 2 messages on it",
- 2, getQueueDepth(queue.getQueueName()));
-
- TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME);
- Message m = durSub.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- m = durSub.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
-
- session.commit();
- session.close();
}
/**
@@ -225,51 +213,61 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
* detected and set as being exclusive during the upgrade process, and that the
* regular queue was not.
*/
+ @Test
public void testQueueExclusivity() throws Exception
{
Map<String, Object> result = getQueueAttributes(QUEUE_NAME);
- ExclusivityPolicy exclusivityPolicy =
- ExclusivityPolicy.valueOf((String) result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE));
- assertEquals("Queue should not have been marked as Exclusive during upgrade",
- ExclusivityPolicy.NONE, exclusivityPolicy);
+ assertThat("Exclusive policy attribute is not found",
+ result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE),
+ is(notNullValue()));
+ assertThat("Queue should not have been marked as Exclusive during upgrade",
+ ExclusivityPolicy.valueOf(String.valueOf(result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE))),
+ is(equalTo(ExclusivityPolicy.NONE)));
result = getQueueAttributes("clientid" + ":" + SUB_NAME);
- exclusivityPolicy =
- ExclusivityPolicy.valueOf((String) result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE));
- assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade",
- exclusivityPolicy != ExclusivityPolicy.NONE);
+ assertThat("Exclusive policy attribute is not found",
+ result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE),
+ is(notNullValue()));
+ assertThat("DurableSubscription backing queue should have been marked as Exclusive during upgrade",
+ ExclusivityPolicy.valueOf(String.valueOf(result.get(org.apache.qpid.server.model.Queue.EXCLUSIVE))),
+ is(not(equalTo(ExclusivityPolicy.NONE))));
}
/**
* Test that the upgraded queue continues to function properly when used
* for persistent messaging and restarting the broker.
- *
+ * <p>
* Sends the new messages to the queue BEFORE consuming those which were
* sent before the upgrade. In doing so, this also serves to test that
* the queue bindings were successfully transitioned during the upgrade.
*/
- public void testBindingAndMessageDurabability() throws Exception
+ @Test
+ public void testBindingAndMessageDurability() throws Exception
{
- // Create a connection and start it
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(QUEUE_NAME);
- MessageProducer messageProducer = session.createProducer(queue);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
- // Send a new message
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+ sendMessages(connection, queue, 1);
- session.close();
+ session.close();
- // Restart the broker
- restartDefaultBroker();
+ // Restart
+ getBrokerAdmin().restart();
- // Drain the queue of all messages
- connection = (TopicConnection) getConnection();
- connection.start();
- consumeQueueMessages(connection, true);
+ // Drain the queue of all messages
+ connection = getConnection();
+ connection.start();
+ consumeQueueMessages(connection, true);
+ }
+ finally
+ {
+ connection.close();
+ }
}
/**
@@ -277,39 +275,22 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
* the broker are properly received following update of the MetaData and
* Content entries during the store upgrade process.
*/
+ @Test
public void testConsumptionOfUpgradedMessages() throws Exception
{
// Create a connection and start it
Connection connection = getConnection();
- connection.start();
-
- consumeDurableSubscriptionMessages(connection, true);
- consumeDurableSubscriptionMessages(connection, false);
- consumeQueueMessages(connection, false);
- }
-
- /**
- * Tests store migration containing messages for non-existing queue.
- *
- * @throws Exception
- */
- public void testMigrationOfMessagesForNonDurableQueues() throws Exception
- {
- // Create a connection and start it
- Connection connection = getConnection();
- connection.start();
-
- // consume a message for non-existing store
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
- MessageConsumer messageConsumer = session.createConsumer(queue);
+ try
+ {
+ connection.start();
- for (int i = 1; i <= 3; i++)
+ consumeDurableSubscriptionMessages(connection, true);
+ consumeDurableSubscriptionMessages(connection, false);
+ consumeQueueMessages(connection, false);
+ }
+ finally
{
- Message message = messageConsumer.receive(getReceiveTimeout());
- assertNotNull("Message was not migrated!", message);
- assertTrue("Unexpected message received!", message instanceof TextMessage);
- assertEquals("ID property did not match", i, message.getIntProperty("ID"));
+ connection.close();
}
}
@@ -318,78 +299,90 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
* such that sending messages with priorities out-of-order and then consuming
* them gets the messages back in priority order.
*/
+ @Test
public void testPriorityQueue() throws Exception
{
- // Create a connection and start it
Connection connection = getConnection();
- connection.start();
-
- // send some messages to the priority queue
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
- MessageProducer producer = session.createProducer(queue);
-
- producer.setPriority(4);
- producer.send(createMessage(1, false, session, producer));
- producer.setPriority(1);
- producer.send(createMessage(2, false, session, producer));
- producer.setPriority(9);
- producer.send(createMessage(3, false, session, producer));
- session.close();
-
- //consume the messages, expected order: msg 3, msg 1, msg 2.
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
-
- Message msg = consumer.receive(getReceiveTimeout());
- assertNotNull("expected message was not received", msg);
- assertEquals(3, msg.getIntProperty("msg"));
- msg = consumer.receive(getReceiveTimeout());
- assertNotNull("expected message was not received", msg);
- assertEquals(1, msg.getIntProperty("msg"));
- msg = consumer.receive(getReceiveTimeout());
- assertNotNull("expected message was not received", msg);
- assertEquals(2, msg.getIntProperty("msg"));
+ try
+ {
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.send(session.createTextMessage("A"), DeliveryMode.PERSISTENT, 4, -1);
+ producer.send(session.createTextMessage("B"), DeliveryMode.PERSISTENT, 1, -1);
+ producer.send(session.createTextMessage("C"), DeliveryMode.PERSISTENT, 9, -1);
+ session.close();
+
+ //consume the messages, expected order: C, A, B.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message1 = consumer.receive(getReceiveTimeout());
+ assertThat("expected message was not received", message1, is(instanceOf(TextMessage.class)));
+ assertThat(((TextMessage) message1).getText(), is(equalTo("C")));
+ Message message2 = consumer.receive(getReceiveTimeout());
+ assertThat("expected message was not received", message2, is(instanceOf(TextMessage.class)));
+ assertThat(((TextMessage) message2).getText(), is(equalTo("A")));
+ Message message3 = consumer.receive(getReceiveTimeout());
+ assertThat("expected message was not received", message3, is(instanceOf(TextMessage.class)));
+ assertThat(((TextMessage) message3).getText(), is(equalTo("B")));
+ }
+ finally
+ {
+ connection.close();
+ }
}
/**
* Test that the queue configured to have a DLQ was recovered and has the alternate exchange
* and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
* DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
- *
+ * <p>
* DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
* that turned it on for this specific queue.
*/
+ @Test
public void testRecoveryOfQueueWithDLQ() throws Exception
{
//verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
Map<String, Object> exchangeAttributes = getExchangeAttributes(QUEUE_WITH_DLQ_NAME + "_DLE");
- assertEquals("Wrong exchange type", "fanout", (String) exchangeAttributes.get(Exchange.TYPE));
+ assertThat("Wrong exchange type",
+ exchangeAttributes.get(Exchange.TYPE),
+ is(equalTo("org.apache.qpid.FanoutExchange")));
+
+ @SuppressWarnings("unchecked")
Collection<Map<String, Object>> bindings = (Collection<Map<String, Object>>) exchangeAttributes.get("bindings");
- assertEquals(1, bindings.size());
- for(Map<String, Object> binding : bindings)
+ assertThat(bindings.size(), is(equalTo(1)));
+ for (Map<String, Object> binding : bindings)
{
String bindingKey = (String) binding.get("bindingKey");
String queueName = (String) binding.get("destination");
//Because its a fanout exchange, we just return a single '*' key with all bound queues
- assertEquals("unexpected binding key", "dlq", bindingKey);
- assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueName);
+ assertThat("unexpected binding key", bindingKey, is(equalTo("dlq")));
+ assertThat("unexpected queue name", queueName, is(equalTo(QUEUE_WITH_DLQ_NAME + "_DLQ")));
}
//verify the queue exists, has the expected alternate exchange and max delivery count
Map<String, Object> queueAttributes = getQueueAttributes(QUEUE_WITH_DLQ_NAME);
- assertEquals("Queue does not have the expected AlternateExchange",
- new HashMap<>(Collections.singletonMap(AlternateBinding.DESTINATION, QUEUE_WITH_DLQ_NAME + "_DLE")),
- new HashMap<>(((Map<String, Object>) queueAttributes.get(Exchange.ALTERNATE_BINDING))));
- assertEquals("Unexpected maximum delivery count", 2,
- ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue());
+ assertThat("Queue does not have the expected AlternateExchange",
+ queueAttributes.get(Exchange.ALTERNATE_BINDING),
+ is(equalTo(Collections.singletonMap(AlternateBinding.DESTINATION, QUEUE_WITH_DLQ_NAME + "_DLE"))));
+
+ assertThat("Unexpected maximum delivery count",
+ ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue(),
+ is(equalTo(2)));
Map<String, Object> dlQueueAttributes = getQueueAttributes(QUEUE_WITH_DLQ_NAME + "_DLQ");
- assertNull("Queue should not have an AlternateExchange",
- dlQueueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING));
- assertEquals("Unexpected maximum delivery count", 0,
- ((Number) dlQueueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue());
+ assertThat("Queue should not have an AlternateExchange",
+ dlQueueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING), is(nullValue()));
+ assertThat("Unexpected maximum delivery count",
+ ((Number) dlQueueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue(),
+ is(equalTo(0)));
try
{
@@ -397,33 +390,26 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
getQueueAttributes(queueName);
fail("A DLQ should not exist for the DLQ itself");
}
- catch (FileNotFoundException e)
+ catch (AmqpManagementFacade.OperationUnsuccessfulException e)
{
- // pass
+ assertThat(e.getStatusCode(), is(equalTo(404)));
}
}
- private Map<String, Object> getExchangeAttributes(final String exchangeName) throws IOException
+ @Override
+ String getOldStoreResourcePath()
{
- String exchangeUrl = String.format("exchange/%1$s/%1$s/%2$s",
- TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST,
- exchangeName);
- return _restTestHelper.getJsonAsMap(exchangeUrl);
+ return "upgrade/bdbstore-v4/test-store/00000000.jdb";
}
- private Map<String, Object> getQueueAttributes(final String queueName) throws IOException
+ private Map<String, Object> getExchangeAttributes(final String exchangeName) throws Exception
{
- String queueUrl = String.format("queue/%1$s/%1$s/%2$s",
- TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST,
- queueName);
- return _restTestHelper.getJsonAsMap(queueUrl);
+ return readEntityUsingAmqpManagement(exchangeName, "org.apache.qpid.Exchange", false);
}
- private long getQueueDepth(final String queueName) throws IOException
+ private Map<String, Object> getQueueAttributes(final String queueName) throws Exception
{
- Map<String, Object> queueAttributes = getQueueAttributes(queueName);
- Map<String, Object> statistics = (Map<String, Object>) queueAttributes.get("statistics");
- return ((Number) statistics.get("queueDepthMessages")).longValue();
+ return readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", false);
}
private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
@@ -432,10 +418,10 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
Topic topic = null;
TopicSubscriber durSub = null;
- if(selector)
+ if (selector)
{
topic = session.createTopic(SELECTOR_TOPIC_NAME);
- durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
+ durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
}
else
{
@@ -443,20 +429,21 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
durSub = session.createDurableSubscriber(topic, SUB_NAME);
}
-
// Retrieve the matching message
Message m = durSub.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- if(selector)
+ assertThat("Failed to receive an expected message", m, is(notNullValue()));
+ if (selector)
{
- assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+ assertThat("Selector property did not match", m.getStringProperty("testprop"), is(equalTo("true")));
}
- assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText());
+ assertThat("ID property did not match", m.getIntProperty("ID"), is(equalTo(1)));
+ assertThat("Message content was not as expected",
+ ((TextMessage) m).getText(),
+ is(equalTo(generateString(1024))));
// Verify that no more messages are received
- m = durSub.receive(1000);
- assertNull("No more messages should have been recieved", m);
+ m = durSub.receive(getReceiveTimeout());
+ assertThat("No more messages should have been recieved", m, is(nullValue()));
durSub.close();
session.close();
@@ -471,46 +458,41 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
Message m;
// Retrieve the initial pre-upgrade messages
- for (int i=1; i <= 5 ; i++)
+ for (int i = 1; i <= 5; i++)
{
m = consumer.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", i, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ assertThat("Failed to receive an expected message", m, is(notNullValue()));
+ assertThat("ID property did not match", m.getIntProperty("ID"), is(equalTo(i)));
+ assertThat("Message content was not as expected",
+ ((TextMessage) m).getText(),
+ is(equalTo(STRING_1024_256)));
}
- for (int i=1; i <= 5 ; i++)
+ for (int i = 1; i <= 5; i++)
{
m = consumer.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", i, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+ assertThat("Failed to receive an expected message", m, is(notNullValue()));
+ assertThat("ID property did not match", m.getIntProperty("ID"), is(equalTo(i)));
+ assertThat("Message content was not as expected", ((TextMessage) m).getText(), is((equalTo(STRING_1024))));
}
- if(extraMessage)
+ if (extraMessage)
{
//verify that the extra message is received
m = consumer.receive(getReceiveTimeout());
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ assertThat("Failed to receive an expected message", m, is(notNullValue()));
+ assertThat("ID property did not match", m.getIntProperty(INDEX), is(equalTo(0)));
+ }
+ else
+ {
+ // Verify that no more messages are received
+ m = consumer.receive(getReceiveTimeout());
+ assertThat("No more messages should have been recieved", m, is(nullValue()));
}
-
- // Verify that no more messages are received
- m = consumer.receive(getReceiveTimeout());
- assertNull("No more messages should have been recieved", m);
consumer.close();
session.close();
}
- private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
- {
- Message send = producerSession.createTextMessage("Message: " + msgId);
- send.setIntProperty("msg", msgId);
-
- return send;
- }
-
/**
* Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
*
@@ -528,26 +510,4 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
return new String(chars);
}
- private static void sendMessages(Session session, MessageProducer messageProducer,
- Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
-
- private static void publishMessages(Session session, TopicPublisher publisher,
- Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- message.setStringProperty("testprop", selectorProperty);
- publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/UpgradeTestBase.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/UpgradeTestBase.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/UpgradeTestBase.java
new file mode 100644
index 0000000..ae2d373
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/UpgradeTestBase.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import javax.jms.Connection;
+
+import org.junit.Before;
+
+import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
+import org.apache.qpid.systests.JmsTestBase;
+
+public abstract class UpgradeTestBase extends JmsTestBase
+{
+ @Before
+ public void restartWithOldStore() throws Exception
+ {
+ Connection connection = getConnectionBuilder().setVirtualHost("$management").build();
+ try
+ {
+ connection.start();
+ Map<String, Object> attributes =
+ readEntityUsingAmqpManagement(getVirtualHostName(), "org.apache.qpid.VirtualHostNode", false, connection);
+ String storePath = (String) attributes.get(BDBVirtualHostNode.STORE_PATH);
+
+ updateEntityUsingAmqpManagement(getVirtualHostName(),
+ "org.apache.qpid.VirtualHostNode",
+ Collections.singletonMap("desiredState", "STOPPED"), connection);
+ copyStore(new File(storePath));
+ updateEntityUsingAmqpManagement(getVirtualHostName(),
+ "org.apache.qpid.VirtualHostNode",
+ Collections.singletonMap("desiredState", "ACTIVE"), connection);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private void copyStore(final File directory) throws IOException
+ {
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ if (directory.mkdirs())
+ {
+ try (InputStream src = getClass().getClassLoader()
+ .getResourceAsStream(getOldStoreResourcePath()))
+ {
+ FileUtils.copy(src, new File(directory, "00000000.jdb"));
+ }
+ }
+ else
+ {
+ fail(String.format("Cannot copy store file into '%s'", directory.getAbsolutePath()));
+ }
+ }
+
+
+ abstract String getOldStoreResourcePath();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
deleted file mode 100644
index 0fde56f..0000000
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.replication;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.http.HttpServletResponse;
-
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
-import org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet;
-import org.apache.qpid.server.model.RemoteReplicationNode;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
-import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
-import org.apache.qpid.systest.rest.Asserts;
-import org.apache.qpid.systest.rest.QpidRestTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-
-public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
-{
- private static final String NODE1 = "node1";
- private static final String NODE2 = "node2";
- private static final String NODE3 = "node3";
-
- private int _node1HaPort;
- private int _node2HaPort;
- private int _node3HaPort;
-
- private String _hostName;
- private String _baseNodeRestUrl;
-
- @Override
- public void setUp() throws Exception
- {
- setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
- setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_TIMEOUT_PROPERTY_NAME, "1000");
-
- super.setUp();
- _hostName = getTestName();
- _baseNodeRestUrl = "virtualhostnode/";
-
- _node1HaPort = findFreePort();
- _node2HaPort = getNextAvailable(_node1HaPort + 1);
- _node3HaPort = getNextAvailable(_node2HaPort + 1);
-
-
- }
-
- @Override
- protected void customizeConfiguration() throws Exception
- {
- super.customizeConfiguration();
- TestBrokerConfiguration config = getDefaultBrokerConfiguration();
- config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
- config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
- }
-
- public void testCreate3NodeGroup() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
- createHANode(NODE2, _node2HaPort, _node1HaPort);
- assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1);
- createHANode(NODE3, _node3HaPort, _node1HaPort);
- assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
- assertRemoteNodes(NODE1, NODE2, NODE3);
- }
-
- public void testMutateStateOfOneNode() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- createHANode(NODE2, _node2HaPort, _node1HaPort);
- createHANode(NODE3, _node3HaPort, _node1HaPort);
-
- String node1Url = _baseNodeRestUrl + NODE1;
- String node2Url = _baseNodeRestUrl + NODE2;
- String node3Url = _baseNodeRestUrl + NODE3;
-
- assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
- assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
- assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
-
- // verify that remote nodes for node1 are created and their state is set to ACTIVE
- _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
- BDBHARemoteReplicationNode.STATE, "ACTIVE");
- _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
- BDBHARemoteReplicationNode.STATE, "ACTIVE");
-
- mutateDesiredState(node1Url, "STOPPED");
-
- assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
- assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
- assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
-
- // verify that remote node state fro node1 is changed to UNAVAILABLE
- _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
- BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
- _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
- BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
-
- List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
- assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
-
- Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1);
-
- assertEquals("Node 1 observed from node 2 is in the wrong state",
- "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE));
- assertEquals("Node 1 observed from node 2 has the wrong role",
- "UNREACHABLE", remoteNode1.get(BDBHARemoteReplicationNode.ROLE));
-
- }
-
- public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- createHANode(NODE2, _node2HaPort, _node1HaPort);
- createHANode(NODE3, _node3HaPort, _node1HaPort);
-
- String node1Url = _baseNodeRestUrl + NODE1;
- String node2Url = _baseNodeRestUrl + NODE2;
- String node3Url = _baseNodeRestUrl + NODE3;
-
- assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
- assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
- assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
-
- // Put virtualhost in STOPPED state
- String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName;
- assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE");
- mutateDesiredState(virtualHostRestUrl, "STOPPED");
- assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED");
-
- // Now stop node 1 to cause an election between nodes 2 & 3
- mutateDesiredState(node1Url, "STOPPED");
- assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
-
- Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url);
-
- //Check the virtual host of the new master is in the stopped state
- String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName;
- assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED");
- }
-
- public void testDeleteReplicaNode() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- createHANode(NODE2, _node2HaPort, _node1HaPort);
- createHANode(NODE3, _node3HaPort, _node1HaPort);
-
- assertRemoteNodes(NODE1, NODE2, NODE3);
-
- List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE1);
- assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size());
-
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE", HttpServletResponse.SC_OK);
-
- int counter = 0;
- while (data.size() != 1 && counter<50)
- {
- data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE1);
- if (data.size() != 1)
- {
- Thread.sleep(100l);
- }
- counter++;
- }
- assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size());
- }
-
- public void testDeleteMasterNode() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- createHANode(NODE2, _node2HaPort, _node1HaPort);
- createHANode(NODE3, _node3HaPort, _node1HaPort);
-
- assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
- assertRemoteNodes(NODE1, NODE2, NODE3);
-
- // change priority to ensure that Node2 becomes a master
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2,
- "PUT",
- Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 100),
- HttpServletResponse.SC_OK);
-
- List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
- assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size());
-
- // delete master
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE", HttpServletResponse.SC_OK);
-
- // wait for new master
- _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
-
- // delete remote node
- getRestTestHelper().submitRequest("remotereplicationnode/" + NODE2 + "/" + NODE1, "DELETE", HttpServletResponse.SC_OK);
-
- int counter = 0;
- while (data.size() != 1 && counter<50)
- {
- data = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
- if (data.size() != 1)
- {
- Thread.sleep(100l);
- }
- counter++;
- }
- assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size());
- }
-
- public void testIntruderBDBHAVHNNotAllowedToConnect() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
-
- // add permitted node
- Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort);
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, HttpServletResponse.SC_CREATED);
- assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
- assertRemoteNodes(NODE1, NODE3);
-
- int intruderPort = getNextAvailable(_node3HaPort + 1);
-
- // try to add not permitted node
- Map<String, Object> nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort);
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, AbstractServlet.SC_UNPROCESSABLE_ENTITY);
-
- assertRemoteNodes(NODE1, NODE3);
- }
-
- public void testIntruderProtection() throws Exception
- {
- createHANode(NODE1, _node1HaPort, _node1HaPort);
- assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
-
- Map<String,Object> nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + NODE1);
- String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH);
- long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue();
-
- // add permitted node
- Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort);
- getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, HttpServletResponse.SC_CREATED);
- assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
- assertRemoteNodes(NODE1, NODE3);
-
- // Ensure PINGDB is created
- // in order to exclude hanging of environment
- // when environment.close is called whilst PINGDB is created.
- // On node joining, a record is updated in PINGDB
- // if lastTransactionId is incremented then node ping task was executed
- int counter = 0;
- long newTransactionId = transactionId;
- while(newTransactionId == transactionId && counter<50)
- {
- nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + NODE1);
- newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue();
- if (newTransactionId != transactionId)
- {
- break;
- }
- counter++;
- Thread.sleep(100L);
- }
-
- //connect intruder node
- String nodeName = NODE2;
- String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1);
- File environmentPathFile = new File(node1StorePath, nodeName);
- environmentPathFile.mkdirs();
- ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort);
- replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS));
- replicationConfig.setConsistencyPolicy(NoConsistencyRequiredPolicy.NO_CONSISTENCY);
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY)));
-
- ReplicatedEnvironment intruder = null;
- try
- {
- intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
- }
- finally
- {
- if (intruder != null)
- {
- intruder.close();
- }
- }
-
- _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
- _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
- }
-
- private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
- {
- Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort);
-
- getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED);
- String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
- _restTestHelper.waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
- }
-
- private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception
- {
- Map<String, Object> nodeData = new HashMap<>();
- nodeData.put(BDBHAVirtualHostNode.NAME, nodeName);
- nodeData.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
- nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
- nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort);
- nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort);
- if (nodePort != helperPort)
- {
- nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1);
- }
-
- Map<String,String> context = new HashMap<>();
- nodeData.put(BDBHAVirtualHostNode.CONTEXT, context);
- if (nodePort == helperPort)
- {
- nodeData.put(BDBHAVirtualHostNode.PERMITTED_NODES, GroupCreator.getPermittedNodes("localhost", _node1HaPort, _node2HaPort, _node3HaPort));
- }
- String bluePrint = GroupCreator.getBlueprint();
- context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint);
- return nodeData;
- }
-
- private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
- {
- boolean isMaster = nodeName.equals(masterNode);
- String expectedRole = isMaster? "MASTER" : "REPLICA";
- _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
-
- Map<String, Object> nodeData = getRestTestHelper().getJsonAsMap(_baseNodeRestUrl + nodeName + "?depth=0");
- assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
- assertEquals("Unexpected type", BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, nodeData.get(BDBHAVirtualHostNode.TYPE));
- assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS));
- assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
- assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));
- assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE));
-
- Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
- assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId);
- assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0);
-
- Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME);
- assertNotNull("Unexpected joinTime", joinTime);
- assertTrue("Unexpected joinTime " + joinTime, joinTime > 0);
-
- if (isMaster)
- {
- _restTestHelper.waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
- }
-
- }
-
- private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception
- {
- List<String> clusterNodes = new ArrayList<>(Arrays.asList(replicaNodes));
- clusterNodes.add(masterNode);
-
- for (String clusterNodeName : clusterNodes)
- {
- List<String> remotes = new ArrayList<>(clusterNodes);
- remotes.remove(clusterNodeName);
- for (String remote : remotes)
- {
- String remoteUrl = "remotereplicationnode/" + clusterNodeName + "/" + remote;
- String desiredNodeState = remote.equals(masterNode) ? "MASTER" : "REPLICA";
- _restTestHelper.waitForAttributeChanged(remoteUrl,
- node -> desiredNodeState.equals(node.get(
- BDBHARemoteReplicationNode.ROLE))
- && (Integer) node.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID) > 0
- && ((Number) node.get(BDBHAVirtualHostNode.JOIN_TIME)).longValue() > 0L);
- }
- }
- }
-
- private void assertActualAndDesiredStates(final String restUrl,
- final String expectedDesiredState,
- final String expectedActualState) throws IOException
- {
- Map<String, Object> objectData = getRestTestHelper().getJsonAsMap(restUrl);
- Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData);
- }
-
- private void mutateDesiredState(final String restUrl, final String newState) throws IOException
- {
- Map<String, Object> newAttributes = new HashMap<>();
- newAttributes.put(VirtualHostNode.DESIRED_STATE, newState);
-
- getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- }
-
- private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName)
- {
- Map<String, Object> foundNode = null;
- for (Map<String, Object> remoteNode : remoteNodes)
- {
- if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME)))
- {
- foundNode = remoteNode;
- break;
- }
- }
- assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes.");
- return foundNode;
- }
-
- private Map<String, Object> awaitNewMaster(final String... nodeUrls)
- throws IOException, InterruptedException
- {
- Map<String, Object> newMasterData = null;
- int counter = 0;
- while (newMasterData == null && counter < 50)
- {
- for(String nodeUrl: nodeUrls)
- {
- Map<String, Object> nodeData = getRestTestHelper().getJsonAsMap(nodeUrl);
- if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE)))
- {
- newMasterData = nodeData;
- break;
- }
- }
- if (newMasterData == null)
- {
- Thread.sleep(100L);
- counter++;
- }
- }
- assertNotNull("Could not find new master", newMasterData);
- return newMasterData;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3a6893e4/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
deleted file mode 100644
index 9cb5da6..0000000
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.replication;
-
-import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
-import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
-import org.apache.qpid.systest.rest.Asserts;
-import org.apache.qpid.systest.rest.QpidRestTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-import org.apache.qpid.server.util.FileUtils;
-
-public class BDBHAVirtualHostRestTest extends QpidRestTestCase
-{
- private String _hostName;
- private File _storeBaseDir;
- private int _nodeHaPort;
- private Object _nodeName;
- private String _virtualhostUrl;
- private String _bluePrint;
- private List<String> _permittedNodes;
- private String _address;
- private int _httpPort;
-
- @Override
- public void setUp() throws Exception
- {
- setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
- setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_TIMEOUT_PROPERTY_NAME, "1000");
-
- _hostName = "ha";
- _nodeName = "node1";
- _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
- _httpPort = findFreePort();
- _nodeHaPort = getNextAvailable(_httpPort + 1);
- _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName;
- _bluePrint = GroupCreator.getBlueprint();
- _permittedNodes = GroupCreator.getPermittedNodes("localhost", _nodeHaPort);
- _address = "localhost:" + _nodeHaPort;
- super.setUp();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- if (_storeBaseDir != null)
- {
- FileUtils.delete(_storeBaseDir, true);
- }
- }
- }
-
- @Override
- protected void customizeConfiguration() throws Exception
- {
- super.customizeConfiguration();
- TestBrokerConfiguration config = getDefaultBrokerConfiguration();
- config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
- config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
-
- Map<String, Object> nodeAttributes = new HashMap<String, Object>();
- nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName);
- nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
- nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName);
- nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
- nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, _address);
- nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort);
- nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, _nodeName);
-
- nodeAttributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, _permittedNodes);
- Map<String, String> context = new HashMap<String,String>();
- context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, _bluePrint);
-
- nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context);
- config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes);
- config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _httpPort);
- }
-
- public void testSetLocalTransactionSynchronizationPolicy() throws Exception
- {
- Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
- assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
-
- Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
-
- hostAttributes = getRestTestHelper().getJsonAsMap(_virtualhostUrl);
- assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
- }
-
- public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
- {
- Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
- assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
-
- Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
-
- hostAttributes = getRestTestHelper().getJsonAsMap(_virtualhostUrl);
- assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
- }
-
- public void testMutateState() throws Exception
- {
- _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
- assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
-
- Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
-
- _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
- assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
-
- newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
- getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
-
- _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
- assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
- }
-
- private void assertActualAndDesireStates(final String restUrl,
- final String expectedDesiredState,
- final String expectedActualState) throws IOException
- {
- Map<String, Object> virtualhost = getRestTestHelper().getJsonAsMap(restUrl);
- Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org