You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2017/04/11 06:58:50 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1107 test
AddressControl.sendMessage
ARTEMIS-1107 test AddressControl.sendMessage
Add tests for this management operation with both core and AMQP encoded
messages. Also fix a few problems with the implementation like not
checking the passed-in headers for null and not counting messages
properly.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/67a06588
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/67a06588
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/67a06588
Branch: refs/heads/master
Commit: 67a06588f41789f50b6eeea1a77da71abdbe4141
Parents: 359592c
Author: Justin Bertram <jb...@apache.org>
Authored: Mon Apr 10 12:57:02 2017 -0500
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Apr 11 07:57:57 2017 +0100
----------------------------------------------------------------------
.../management/impl/AddressControlImpl.java | 8 ++--
.../tests/integration/amqp/ProtonTest.java | 42 ++++++++++++++++-
.../tests/integration/amqp/ProtonTestBase.java | 2 +-
.../management/AddressControlTest.java | 47 ++++++++++++++++++++
4 files changed, 94 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index afdca45..a321165 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -291,8 +291,10 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
- for (String header : headers.keySet()) {
- message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+ if (headers != null) {
+ for (String header : headers.keySet()) {
+ message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+ }
}
message.setType((byte) type);
message.setDurable(durable);
@@ -341,7 +343,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue);
// Ignore the "special" subscription
- if (coreQueueControl != null && !coreQueueControl.getName().equals(getAddress())) {
+ if (coreQueueControl != null) {
if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() ||
durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) {
matchingQueues.add(coreQueueControl);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 199d9c5..f443338 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -39,6 +39,8 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -57,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -74,8 +77,11 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionMa
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
@@ -109,12 +115,14 @@ public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
- private static final String brokerName = "my-broker";
+ private static final String brokerName = "localhost";
private static final long maxSizeBytes = 1 * 1024 * 1024;
private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
+ private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
private int messagesSent = 0;
// this will ensure that all tests in this class are run twice,
@@ -150,6 +158,8 @@ public class ProtonTest extends ProtonTestBase {
protected ActiveMQServer createAMQPServer(int port) throws Exception {
ActiveMQServer server = super.createAMQPServer(port);
server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
+ server.setMBeanServer(mBeanServer);
+ server.getConfiguration().setJMXManagementEnabled(true);
return server;
}
@@ -246,6 +256,36 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
+ public void testAddressControlSendMessage() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+ AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
+ Assert.assertEquals(1, addressControl.getQueueNames().length);
+ addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
+
+ Assert.assertEquals(1, addressControl.getMessageCount());
+
+ Connection connection = createConnection("myClientId");
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session.createQueue(address.toString());
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(500);
+ assertNotNull(message);
+ byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
+ ((BytesMessage)message).readBytes(buffer);
+ assertEquals("test", new String(buffer));
+ session.close();
+ connection.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
public void testDurableSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection("myClientId");
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
index 1a06c54..599022e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -30,7 +30,7 @@ import org.junit.Before;
public class ProtonTestBase extends ActiveMQTestBase {
- protected String brokerName = "my-broker";
+ protected String brokerName = "localhost";
protected ActiveMQServer server;
protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index e62ce12..85f6eca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -23,9 +23,12 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -33,12 +36,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.RoleInfo;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
@@ -310,6 +315,48 @@ public class AddressControlTest extends ManagementTestBase {
assertEquals(RoutingType.ANYCAST.toString(), ((JsonString) jsonArray.get(0)).getString());
}
+ @Test
+ public void testGetMessageCount() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ assertEquals(0, addressControl.getMessageCount());
+
+ ClientProducer producer = session.createProducer(address.toString());
+ producer.send(session.createMessage(false));
+ assertEquals(0, addressControl.getMessageCount());
+
+ session.createQueue(address, RoutingType.ANYCAST, address);
+ producer.send(session.createMessage(false));
+ assertEquals(1, addressControl.getMessageCount());
+
+ session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
+ producer.send(session.createMessage(false));
+ assertEquals(2, addressControl.getMessageCount());
+ }
+
+ @Test
+ public void testSendMessage() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ Assert.assertEquals(0, addressControl.getQueueNames().length);
+ session.createQueue(address, RoutingType.ANYCAST, address);
+ Assert.assertEquals(1, addressControl.getQueueNames().length);
+ addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
+
+ Assert.assertEquals(1, addressControl.getMessageCount());
+
+ ClientConsumer consumer = session.createConsumer(address);
+ ClientMessage message = consumer.receive(500);
+ assertNotNull(message);
+ byte[] buffer = new byte[message.getBodyBuffer().readableBytes()];
+ message.getBodyBuffer().readBytes(buffer);
+ assertEquals("test", new String(buffer));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------