You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/03/11 15:10:39 UTC
[activemq-artemis] branch master updated: ARTEMIS-2645 make CLI
resources more test friendly
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 62fef18 ARTEMIS-2645 make CLI resources more test friendly
new df01463 This closes #3009
62fef18 is described below
commit 62fef18c65ac9f816a77dbfb31de873f458f0c8f
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Mar 9 15:34:23 2020 -0500
ARTEMIS-2645 make CLI resources more test friendly
Fix some test race conditions as well.
---
.../cli/commands/address/AddressAbstract.java | 3 +-
.../cli/commands/messages/ConnectionAbstract.java | 9 +-
.../artemis/cli/commands/messages/Consumer.java | 45 ++++++
.../cli/commands/messages/DestAbstract.java | 54 +++++++
.../artemis/cli/commands/messages/Producer.java | 78 +++++++++-
.../artemis/cli/commands/queue/QueueAbstract.java | 6 +-
.../artemis/cli/commands/queue/StatQueue.java | 18 ++-
.../org/apache/activemq/cli/test/ArtemisTest.java | 2 +-
.../apache/activemq/cli/test/CliProducerTest.java | 28 ++--
.../org/apache/activemq/cli/test/CliTestBase.java | 26 ++--
.../activemq/cli/test/MessageSerializerTest.java | 168 +++++++++++----------
11 files changed, 311 insertions(+), 126 deletions(-)
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java
index 3eceb03..7b35262 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java
@@ -38,8 +38,9 @@ public abstract class AddressAbstract extends AbstractAction {
private Boolean noMulticast;
- public void setName(String name) {
+ public AbstractAction setName(String name) {
this.name = name;
+ return this;
}
public String getName(boolean requireInput) {
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
index c170259..1d252c3 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
@@ -48,24 +48,27 @@ public class ConnectionAbstract extends InputAbstract {
return user;
}
- public void setUser(String user) {
+ public ConnectionAbstract setUser(String user) {
this.user = user;
+ return this;
}
public String getPassword() {
return password;
}
- public void setPassword(String password) {
+ public ConnectionAbstract setPassword(String password) {
this.password = password;
+ return this;
}
public String getClientID() {
return clientID;
}
- public void setClientID(String clientID) {
+ public ConnectionAbstract setClientID(String clientID) {
this.clientID = clientID;
+ return this;
}
public String getProtocol() {
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index 71eac78..498d850 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -140,4 +140,49 @@ public class Consumer extends DestAbstract {
messageSerializer.write(message);
}
}
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public Consumer setDurable(boolean durable) {
+ this.durable = durable;
+ return this;
+ }
+
+ public boolean isBreakOnNull() {
+ return breakOnNull;
+ }
+
+ public Consumer setBreakOnNull(boolean breakOnNull) {
+ this.breakOnNull = breakOnNull;
+ return this;
+ }
+
+ public int getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ public Consumer setReceiveTimeout(int receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ return this;
+ }
+
+ public String getFilter() {
+ return filter;
+ }
+
+ public Consumer setFilter(String filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ public String getFile() {
+ return file;
+ }
+
+ public Consumer setFile(String file) {
+ this.file = file;
+ return this;
+ }
}
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
index 0611ec5..360bac6 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
@@ -79,4 +79,58 @@ public class DestAbstract extends ConnectionAbstract {
return destination;
}
}
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public DestAbstract setDestination(String destination) {
+ this.destination = destination;
+ return this;
+ }
+
+ public int getMessageCount() {
+ return messageCount;
+ }
+
+ public DestAbstract setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ return this;
+ }
+
+ public int getSleep() {
+ return sleep;
+ }
+
+ public DestAbstract setSleep(int sleep) {
+ this.sleep = sleep;
+ return this;
+ }
+
+ public int getTxBatchSize() {
+ return txBatchSize;
+ }
+
+ public DestAbstract setTxBatchSize(int txBatchSize) {
+ this.txBatchSize = txBatchSize;
+ return this;
+ }
+
+ public int getThreads() {
+ return threads;
+ }
+
+ public DestAbstract setThreads(int threads) {
+ this.threads = threads;
+ return this;
+ }
+
+ public String getSerializer() {
+ return serializer;
+ }
+
+ public DestAbstract setSerializer(String serializer) {
+ this.serializer = serializer;
+ return this;
+ }
}
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
index 442017c..6d060fe 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
@@ -59,7 +59,79 @@ public class Producer extends DestAbstract {
String msgGroupID = null;
@Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.")
- String fileName = null;
+ String file = null;
+
+ public boolean isNonpersistent() {
+ return nonpersistent;
+ }
+
+ public Producer setNonpersistent(boolean nonpersistent) {
+ this.nonpersistent = nonpersistent;
+ return this;
+ }
+
+ public int getMessageSize() {
+ return messageSize;
+ }
+
+ public Producer setMessageSize(int messageSize) {
+ this.messageSize = messageSize;
+ return this;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Producer setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public int getTextMessageSize() {
+ return textMessageSize;
+ }
+
+ public Producer setTextMessageSize(int textMessageSize) {
+ this.textMessageSize = textMessageSize;
+ return this;
+ }
+
+ public int getObjectSize() {
+ return objectSize;
+ }
+
+ public Producer setObjectSize(int objectSize) {
+ this.objectSize = objectSize;
+ return this;
+ }
+
+ public long getMsgTTL() {
+ return msgTTL;
+ }
+
+ public Producer setMsgTTL(long msgTTL) {
+ this.msgTTL = msgTTL;
+ return this;
+ }
+
+ public String getMsgGroupID() {
+ return msgGroupID;
+ }
+
+ public Producer setMsgGroupID(String msgGroupID) {
+ this.msgGroupID = msgGroupID;
+ return this;
+ }
+
+ public String getFile() {
+ return file;
+ }
+
+ public Producer setFile(String file) {
+ this.file = file;
+ return this;
+ }
@Override
public Object execute(ActionContext context) throws Exception {
@@ -70,7 +142,7 @@ public class Producer extends DestAbstract {
try (Connection connection = factory.createConnection()) {
// If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation.
- if (fileName != null) {
+ if (file != null) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination dest = getDestination(session);
@@ -87,7 +159,7 @@ public class Producer extends DestAbstract {
InputStream in;
try {
- in = new FileInputStream(fileName);
+ in = new FileInputStream(file);
} catch (Exception e) {
System.err.println("Error: Unable to open file for reading\n" + e.getMessage());
return null;
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java
index 2ff1b6d..12f0e37 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java
@@ -203,12 +203,14 @@ public class QueueAbstract extends AbstractAction {
this.purgeOnNoConsumers = purgeOnNoConsumers;
}
- public void setAddress(String address) {
+ public QueueAbstract setAddress(String address) {
this.address = address;
+ return this;
}
- public void setName(String name) {
+ public QueueAbstract setName(String name) {
this.name = name;
+ return this;
}
public String getName() {
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java
index 2d74fc4..e9895bd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java
@@ -70,28 +70,34 @@ public class StatQueue extends AbstractAction {
private int maxRows = 50;
//easier for testing
- public void setQueueName(String queueName) {
+ public StatQueue setQueueName(String queueName) {
this.queueName = queueName;
+ return this;
}
- public void setOperationName(String operationName) {
+ public StatQueue setOperationName(String operationName) {
this.operationName = operationName;
+ return this;
}
- public void setFieldName(String fieldName) {
+ public StatQueue setFieldName(String fieldName) {
this.fieldName = fieldName;
+ return this;
}
- public void setValue(String value) {
+ public StatQueue setValue(String value) {
this.value = value;
+ return this;
}
- public void setMaxRows(int maxRows) {
+ public StatQueue setMaxRows(int maxRows) {
this.maxRows = maxRows;
+ return this;
}
- public void setverbose(boolean verbose) {
+ public StatQueue setverbose(boolean verbose) {
this.verbose = verbose;
+ return this;
}
@Override
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index 42bc468..7a26c34 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -1356,7 +1356,7 @@ public class ArtemisTest extends CliTestBase {
}
//read individual lines from byteStream
- private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
+ public static ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
byte[] bytes;
if (errorOutput) {
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
index 0f39a1f..fa74713 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
@@ -16,7 +16,8 @@
*/
package org.apache.activemq.cli.test;
-import org.apache.activemq.artemis.cli.Artemis;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.After;
@@ -54,22 +55,17 @@ public class CliProducerTest extends CliTestBase {
}
private void produceMessages(String address, String message, int msgCount) throws Exception {
- Artemis.main("producer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--message", message,
- "--message-count", String.valueOf(msgCount)
- );
+ new Producer()
+ .setMessage(message)
+ .setMessageCount(msgCount)
+ .setDestination(address)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(new TestActionContext());
}
private void produceMessages(String address, int msgCount) throws Exception {
- Artemis.main("producer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--message-count", String.valueOf(msgCount)
- );
+ produceMessages(address, null, msgCount);
}
private void checkSentMessages(Session session, String address, String messageBody) throws Exception {
@@ -98,7 +94,7 @@ public class CliProducerTest extends CliTestBase {
String queue = "queue";
String fqqn = address + "::" + queue;
- createQueue("--multicast", address, queue);
+ createQueue(RoutingType.MULTICAST, address, queue);
Session session = createSession(connection);
produceMessages("topic://" + address, TEST_MESSAGE_COUNT);
@@ -113,7 +109,7 @@ public class CliProducerTest extends CliTestBase {
String fqqn = address + "::" + queue;
String messageBody = new StringGenerator().generateRandomString(20);
- createQueue("--multicast", address, queue);
+ createQueue(RoutingType.MULTICAST, address, queue);
Session session = createSession(connection);
produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT);
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java
index f3d1591..f77b242 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.cli.test;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.Run;
+import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -87,7 +89,7 @@ public class CliTestBase {
File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
setupAuth(rootDirectory);
Run.setEmbedded(true);
- Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login");
+ Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence");
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
Artemis.internalExecute("run");
}
@@ -110,16 +112,18 @@ public class CliTestBase {
return new ActiveMQConnectionFactory("tcp://localhost:" + String.valueOf(serverPort));
}
- protected void createQueue(String routingTypeOption, String address, String queueName) throws Exception {
- Artemis.main("queue", "create",
- "--user", "admin",
- "--password", "admin",
- "--address", address,
- "--name", queueName,
- routingTypeOption,
- "--durable",
- "--preserve-on-no-consumers",
- "--auto-create-address");
+ protected void createQueue(RoutingType routingType, String address, String queueName) throws Exception {
+ new CreateQueue()
+ .setAddress(address)
+ .setName(queueName)
+ .setAnycast(RoutingType.ANYCAST.equals(routingType))
+ .setMulticast(RoutingType.MULTICAST.equals(routingType))
+ .setDurable(true)
+ .setPreserveOnNoConsumers(true)
+ .setAutoCreateAddress(true)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(new TestActionContext());
}
void closeConnection(ActiveMQConnectionFactory cf, Connection connection) throws Exception {
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
index 51cc3e5..92e1567 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
@@ -30,20 +30,24 @@ import javax.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.cli.Artemis;
+import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
+import org.apache.activemq.artemis.cli.commands.messages.Consumer;
+import org.apache.activemq.artemis.cli.commands.messages.Producer;
+import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.activemq.cli.test.ArtemisTest.getOutputLines;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -78,17 +82,6 @@ public class MessageSerializerTest extends CliTestBase {
return temporaryFolder.newFile("messages.xml");
}
- private List<Message> generateTextMessages(Session session, String address) throws Exception {
- List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
- for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
- messages.add(session.createTextMessage(RandomUtil.randomString()));
- }
-
- sendMessages(session, address, messages);
-
- return messages;
- }
-
private List<Message> generateTextMessages(Session session, Destination destination) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
@@ -101,17 +94,38 @@ public class MessageSerializerTest extends CliTestBase {
}
private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
- List<Message> recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
+ checkSentMessages(session, messages, address, null);
+ }
+
+ private void checkSentMessages(Session session, List<Message> messages, String address, String key) throws Exception {
+ List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
- assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
+ Message m = messages.get(i);
+ if (m instanceof TextMessage) {
+ assertEquals(((TextMessage) m).getText(), ((TextMessage) received.get(i)).getText());
+ } else if (m instanceof ObjectMessage) {
+ assertEquals(((ObjectMessage) m).getObject(), ((ObjectMessage) received.get(i)).getObject());
+ } else if (m instanceof MapMessage) {
+ assertEquals(((MapMessage) m).getString(key), ((MapMessage) received.get(i)).getString(key));
+ }
}
}
- private void sendMessages(Session session, String address, List<Message> messages) throws Exception {
- MessageProducer producer = session.createProducer(getDestination(address));
- for (Message m : messages) {
- producer.send(m);
+ private boolean verifyMessageCount(String address, int messageCount) throws Exception {
+ TestActionContext context = new TestActionContext();
+ new StatQueue()
+ .setQueueName(address)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(context);
+ int currentMessageCount;
+ try {
+ // parse the value for MESSAGE_COUNT from the output
+ currentMessageCount = Integer.parseInt(getOutputLines(context, false).get(1).split("\\|")[4].trim());
+ } catch (Exception e) {
+ currentMessageCount = 0;
}
+ return (messageCount == currentMessageCount);
}
private void sendMessages(Session session, Destination destination, List<Message> messages) throws Exception {
@@ -125,35 +139,35 @@ public class MessageSerializerTest extends CliTestBase {
exportMessages(address, TEST_MESSAGE_COUNT, false, "test-client", output);
}
- private void exportMessages(String address, int noMessages, boolean durable, String clientId, File output) throws Exception {
- List<String> args = new ArrayList<>(Arrays.asList("consumer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--message-count", Integer.toString(noMessages),
- "--data", output.getAbsolutePath(),
- "--clientID", clientId));
- if (durable) {
- args.add("--durable");
- }
-
- Artemis.main(args.toArray(new String[0]));
+ private void exportMessages(String address, int messageCount, boolean durable, String clientId, File output) throws Exception {
+ new Consumer()
+ .setFile(output.getAbsolutePath())
+ .setDurable(durable)
+ .setDestination(address)
+ .setMessageCount(messageCount)
+ .setUser("admin")
+ .setPassword("admin")
+ .setClientID(clientId)
+ .execute(new TestActionContext());
}
private void importMessages(String address, File input) throws Exception {
- Artemis.main("producer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--data", input.getAbsolutePath());
+ new Producer()
+ .setFile(input.getAbsolutePath())
+ .setDestination(address)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(new TestActionContext());
}
private void createBothTypeAddress(String address) throws Exception {
- Artemis.main("address", "create",
- "--user", "admin",
- "--password", "admin",
- "--name", address,
- "--anycast", "--multicast");
+ new CreateAddress()
+ .setAnycast(true)
+ .setMulticast(true)
+ .setName(address)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(new TestActionContext());
}
@Test
@@ -163,18 +177,15 @@ public class MessageSerializerTest extends CliTestBase {
Session session = createSession(connection);
- List<Message> messages = generateTextMessages(session, address);
+ List<Message> sent = generateTextMessages(session, getDestination(address));
exportMessages(address, file);
- // Ensure there's nothing left to consume
- MessageConsumer consumer = session.createConsumer(getDestination(address));
- assertNull(consumer.receive(1000));
- consumer.close();
+ Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
-
- checkSentMessages(session, messages, address);
+ Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
+ checkSentMessages(session, sent, address);
}
@Test
@@ -190,19 +201,14 @@ public class MessageSerializerTest extends CliTestBase {
sent.add(session.createObjectMessage(UUID.randomUUID()));
}
- sendMessages(session, address, sent);
+ sendMessages(session, getDestination(address), sent);
exportMessages(address, file);
- // Ensure there's nothing left to consume
- MessageConsumer consumer = session.createConsumer(getDestination(address));
- assertNull(consumer.receive(1000));
- consumer.close();
+ Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
- List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
- for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
- assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
- }
+ Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
+ checkSentMessages(session, sent, address);
}
@Test
@@ -220,19 +226,14 @@ public class MessageSerializerTest extends CliTestBase {
sent.add(m);
}
- sendMessages(session, address, sent);
+ sendMessages(session, getDestination(address), sent);
exportMessages(address, file);
- // Ensure there's nothing left to consume
- MessageConsumer consumer = session.createConsumer(getDestination(address));
- assertNull(consumer.receive(1000));
- consumer.close();
+ Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
- List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
- for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
- assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
- }
+ Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
+ checkSentMessages(session, sent, address, key);
}
@Test
@@ -251,8 +252,8 @@ public class MessageSerializerTest extends CliTestBase {
String queue1Name = "queue1";
String queue2Name = "queue2";
- createQueue("--" + routingType.toString().toLowerCase(), address, queue1Name);
- createQueue("--" + routingType.toString().toLowerCase(), address, queue2Name);
+ createQueue(routingType, address, queue1Name);
+ createQueue(routingType, address, queue2Name);
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
@@ -265,11 +266,12 @@ public class MessageSerializerTest extends CliTestBase {
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
- Artemis.main("producer",
- "--user", "admin",
- "--password", "admin",
- "--destination", (routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name),
- "--message-count", "5");
+ new Producer()
+ .setDestination((routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name))
+ .setMessageCount(5)
+ .setUser("admin")
+ .setPassword("admin")
+ .execute(new TestActionContext());
assertNull(consumer2.receive(1000));
assertNotNull(consumer1.receive(1000));
@@ -285,7 +287,7 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
- createQueue("--multicast", addr, queue);
+ createQueue(RoutingType.MULTICAST, addr, queue);
Session session = createSession(connection);
@@ -308,12 +310,12 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
- createQueue("--multicast", mAddress, queueM1Name);
- createQueue("--multicast", mAddress, queueM2Name);
+ createQueue(RoutingType.MULTICAST, mAddress, queueM1Name);
+ createQueue(RoutingType.MULTICAST, mAddress, queueM2Name);
Session session = createSession(connection);
- List<Message> messages = generateTextMessages(session, aAddress);
+ List<Message> messages = generateTextMessages(session, getDestination(aAddress));
exportMessages(aAddress, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + mAddress, file);
@@ -333,12 +335,12 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
- createQueue("--multicast", mAddress, queueM1Name);
- createQueue("--multicast", mAddress, queueM2Name);
+ createQueue(RoutingType.MULTICAST, mAddress, queueM1Name);
+ createQueue(RoutingType.MULTICAST, mAddress, queueM2Name);
Session session = createSession(connection);
- List<Message> messages = generateTextMessages(session, aAddress);
+ List<Message> messages = generateTextMessages(session, getDestination(aAddress));
exportMessages(aAddress, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqnMulticast1, file);
@@ -380,12 +382,12 @@ public class MessageSerializerTest extends CliTestBase {
connection.setClientID(clientId);
createBothTypeAddress(address);
- createQueue("--anycast", address, address);
+ createQueue(RoutingType.ANYCAST, address, address);
Session session = createSession(connection);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");
- List<Message> messages = generateTextMessages(session, address);
+ List<Message> messages = generateTextMessages(session, getDestination(address));
exportMessages(address, file);