You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2017/04/11 06:58:50 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1107 test AddressControl.sendMessage

ARTEMIS-1107 test AddressControl.sendMessage

Add tests for this management operation with both core and AMQP encoded
messages. Also fix a few problems with the implementation like not
checking the passed-in headers for null and not counting messages
properly.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/67a06588
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/67a06588
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/67a06588

Branch: refs/heads/master
Commit: 67a06588f41789f50b6eeea1a77da71abdbe4141
Parents: 359592c
Author: Justin Bertram <jb...@apache.org>
Authored: Mon Apr 10 12:57:02 2017 -0500
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Apr 11 07:57:57 2017 +0100

----------------------------------------------------------------------
 .../management/impl/AddressControlImpl.java     |  8 ++--
 .../tests/integration/amqp/ProtonTest.java      | 42 ++++++++++++++++-
 .../tests/integration/amqp/ProtonTestBase.java  |  2 +-
 .../management/AddressControlTest.java          | 47 ++++++++++++++++++++
 4 files changed, 94 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index afdca45..a321165 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -291,8 +291,10 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
             }
          });
          CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
-         for (String header : headers.keySet()) {
-            message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+         if (headers != null) {
+            for (String header : headers.keySet()) {
+               message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+            }
          }
          message.setType((byte) type);
          message.setDurable(durable);
@@ -341,7 +343,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
             QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue);
 
             // Ignore the "special" subscription
-            if (coreQueueControl != null && !coreQueueControl.getName().equals(getAddress())) {
+            if (coreQueueControl != null) {
                if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() ||
                      durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) {
                   matchingQueues.add(coreQueueControl);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 199d9c5..f443338 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -39,6 +39,8 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -57,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -74,8 +77,11 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionMa
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.TimeUtils;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.VersionLoader;
@@ -109,12 +115,14 @@ public class ProtonTest extends ProtonTestBase {
    private static final String amqpConnectionUri = "amqp://localhost:5672";
 
    private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
-   private static final String brokerName = "my-broker";
+   private static final String brokerName = "localhost";
 
    private static final long maxSizeBytes = 1 * 1024 * 1024;
 
    private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
 
+   private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
    private int messagesSent = 0;
 
    // this will ensure that all tests in this class are run twice,
@@ -150,6 +158,8 @@ public class ProtonTest extends ProtonTestBase {
    protected ActiveMQServer createAMQPServer(int port) throws Exception {
       ActiveMQServer server = super.createAMQPServer(port);
       server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
+      server.setMBeanServer(mBeanServer);
+      server.getConfiguration().setJMXManagementEnabled(true);
       return server;
    }
 
@@ -246,6 +256,36 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testAddressControlSendMessage() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
+      Assert.assertEquals(1, addressControl.getQueueNames().length);
+      addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
+
+      Assert.assertEquals(1, addressControl.getMessageCount());
+
+      Connection connection = createConnection("myClientId");
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(address.toString());
+         MessageConsumer consumer = session.createConsumer(queue);
+         Message message = consumer.receive(500);
+         assertNotNull(message);
+         byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
+         ((BytesMessage)message).readBytes(buffer);
+         assertEquals("test", new String(buffer));
+         session.close();
+         connection.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
    public void testDurableSubscriptionUnsubscribe() throws Exception {
       Connection connection = createConnection("myClientId");
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
index 1a06c54..599022e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -30,7 +30,7 @@ import org.junit.Before;
 
 public class ProtonTestBase extends ActiveMQTestBase {
 
-   protected String brokerName = "my-broker";
+   protected String brokerName = "localhost";
    protected ActiveMQServer server;
 
    protected String tcpAmqpConnectionUri = "tcp://localhost:5672";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/67a06588/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index e62ce12..85f6eca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -23,9 +23,12 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -33,12 +36,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.RoleInfo;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
@@ -310,6 +315,48 @@ public class AddressControlTest extends ManagementTestBase {
       assertEquals(RoutingType.ANYCAST.toString(), ((JsonString) jsonArray.get(0)).getString());
    }
 
+   @Test
+   public void testGetMessageCount() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      assertEquals(0, addressControl.getMessageCount());
+
+      ClientProducer producer = session.createProducer(address.toString());
+      producer.send(session.createMessage(false));
+      assertEquals(0, addressControl.getMessageCount());
+
+      session.createQueue(address, RoutingType.ANYCAST, address);
+      producer.send(session.createMessage(false));
+      assertEquals(1, addressControl.getMessageCount());
+
+      session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
+      producer.send(session.createMessage(false));
+      assertEquals(2, addressControl.getMessageCount());
+   }
+
+   @Test
+   public void testSendMessage() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      Assert.assertEquals(0, addressControl.getQueueNames().length);
+      session.createQueue(address, RoutingType.ANYCAST, address);
+      Assert.assertEquals(1, addressControl.getQueueNames().length);
+      addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
+
+      Assert.assertEquals(1, addressControl.getMessageCount());
+
+      ClientConsumer consumer = session.createConsumer(address);
+      ClientMessage message = consumer.receive(500);
+      assertNotNull(message);
+      byte[] buffer = new byte[message.getBodyBuffer().readableBytes()];
+      message.getBodyBuffer().readBytes(buffer);
+      assertEquals("test", new String(buffer));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------