You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2017/09/08 11:45:26 UTC

[1/3] activemq-artemis git commit: This closes #1521 Amqpfqqn fixes

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d414a1968 -> 9c31055ab


This closes #1521 Amqpfqqn fixes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c31055a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c31055a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c31055a

Branch: refs/heads/master
Commit: 9c31055aba93ed2af7c2e84516d302b4c5151b79
Parents: d414a19 152791c
Author: Andy Taylor <an...@gmail.com>
Authored: Fri Sep 8 12:41:36 2017 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Fri Sep 8 12:41:36 2017 +0100

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  |  28 +++-
 .../amqp/AmqpFullyQualifiedNameTest.java        | 145 ++++++++++++++-----
 2 files changed, 125 insertions(+), 48 deletions(-)
----------------------------------------------------------------------



[3/3] activemq-artemis git commit: ARTEMIS-1392 Fix NPE when FQQN queue does not exist during multicast subscribe

Posted by an...@apache.org.
ARTEMIS-1392 Fix NPE when FQQN queue does not exist during multicast subscribe


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/32ac370e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/32ac370e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/32ac370e

Branch: refs/heads/master
Commit: 32ac370edce4b5232c8d0e3aa7c9df2fd8795766
Parents: d414a19
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Sep 6 13:01:42 2017 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Fri Sep 8 12:41:36 2017 +0100

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  | 12 +++++-----
 .../amqp/AmqpFullyQualifiedNameTest.java        | 23 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32ac370e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 50d2ef4..d2a097c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -22,6 +22,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -73,9 +75,6 @@ import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 /**
  * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
  */
@@ -334,8 +333,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
 
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
-               queue = matchingAnycastQueue.toString();
+               SimpleString matchingQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
+               if (matchingQueue == null) {
+                  throw new ActiveMQAMQPNotFoundException("Queue: '" + queueNameToUse + "' does not exist");
+               }
+               queue = matchingQueue.toString();
             }
             //if the address specifies a broker configured queue then we always use this, treat it as a queue
             if (queue != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32ac370e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
index 65b0f7f..eaca868 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
@@ -68,6 +68,27 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
       server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
    }
 
+   @Test
+   public void testFQQNTopicWhenQueueDoesNotExist() throws Exception {
+      Exception e = null;
+      String queueName = "testQueue";
+
+      Connection connection = createConnection(false);
+      try {
+         connection.setClientID("FQQNconn");
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(multicastAddress.toString() + "::" + queueName);
+         session.createConsumer(topic);
+      } catch (InvalidDestinationException ide) {
+         e = ide;
+      } finally {
+         connection.close();
+      }
+      assertNotNull(e);
+      assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
+   }
+
    @Test(timeout = 60000)
    //there isn't much use of FQQN for topics
    //however we can test query functionality
@@ -78,7 +99,7 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
          connection.setClientID("FQQNconn");
          connection.start();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Topic topic = session.createTopic(multicastAddress.toString());
+         Topic topic = session.createTopic(multicastAddress.toString() + "::someaddress");
 
          MessageConsumer consumer1 = session.createConsumer(topic);
          MessageConsumer consumer2 = session.createConsumer(topic);


[2/3] activemq-artemis git commit: ARTEMIS-1402 AMQP notfound on unmatched FQQN

Posted by an...@apache.org.
ARTEMIS-1402 AMQP notfound on unmatched FQQN

Return an AMQP not:found error to the client, if the supplied queue in
an FQQN belongs to an address other than what is provided in the FQQN.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/152791c2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/152791c2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/152791c2

Branch: refs/heads/master
Commit: 152791c230bd9c2d79a190fc555772f32472426f
Parents: 32ac370
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Sep 7 10:40:27 2017 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Fri Sep 8 12:41:36 2017 +0100

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  |  26 ++--
 .../amqp/AmqpFullyQualifiedNameTest.java        | 124 +++++++++++++------
 2 files changed, 102 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/152791c2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index d2a097c..fbaae8a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -332,13 +332,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                supportedFilters.put(filter.getKey(), filter.getValue());
             }
 
-            if (queueNameToUse != null) {
-               SimpleString matchingQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
-               if (matchingQueue == null) {
-                  throw new ActiveMQAMQPNotFoundException("Queue: '" + queueNameToUse + "' does not exist");
-               }
-               queue = matchingQueue.toString();
-            }
+            queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
+
             //if the address specifies a broker configured queue then we always use this, treat it as a queue
             if (queue != null) {
                multicast = false;
@@ -392,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
          } else {
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
+               SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
                if (matchingAnycastQueue != null) {
                   queue = matchingAnycastQueue.toString();
                } else {
@@ -442,6 +437,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
+   private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
+      if (queueName != null) {
+         QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
+         if (!result.isExists()) {
+            throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
+         } else {
+            if (!result.getAddress().equals(address)) {
+               throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
+            }
+            return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
+         }
+      }
+      return null;
+   }
+
    protected String getClientId() {
       return connection.getRemoteContainer();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/152791c2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
index eaca868..9bb68f0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
@@ -89,22 +89,89 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
       assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
    }
 
+   @Test
+   public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
+
+      // Create 2 Queues: address1::queue1, address2::queue2
+      String address1 = "a1";
+      String address2 = "a2";
+      String queue1 = "q1";
+      String queue2 = "q2";
+
+      server.createQueue(SimpleString.toSimpleString(address1), RoutingType.ANYCAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
+      server.createQueue(SimpleString.toSimpleString(address2), RoutingType.ANYCAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
+
+      Exception e = null;
+
+      // Wrong FQQN.  Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
+      String wrongFQQN = address1 + "::"  + queue2;
+      Connection connection = createConnection(false);
+      try {
+         connection.setClientID("FQQNconn");
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(wrongFQQN);
+         session.createConsumer(queue);
+      } catch (InvalidDestinationException ide) {
+         e = ide;
+      } finally {
+         connection.close();
+      }
+      assertNotNull(e);
+      assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
+   }
+
+   @Test
+   public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
+
+      // Create 2 Queues: address1::queue1, address2::queue2
+      String address1 = "a1";
+      String address2 = "a2";
+      String queue1 = "q1";
+      String queue2 = "q2";
+
+      server.createQueue(SimpleString.toSimpleString(address1), RoutingType.MULTICAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
+      server.createQueue(SimpleString.toSimpleString(address2), RoutingType.MULTICAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
+
+      Exception e = null;
+
+      // Wrong FQQN.  Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
+      String wrongFQQN = address1 + "::"  + queue2;
+      Connection connection = createConnection(false);
+      try {
+         connection.setClientID("FQQNconn");
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(wrongFQQN);
+         session.createConsumer(topic);
+      } catch (InvalidDestinationException ide) {
+         e = ide;
+      } finally {
+         connection.close();
+      }
+      assertNotNull(e);
+      assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
+   }
+
    @Test(timeout = 60000)
    //there isn't much use of FQQN for topics
    //however we can test query functionality
    public void testTopic() throws Exception {
 
+      SimpleString queueName = new SimpleString("someAddress");
+      server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName, null, false, false);
       Connection connection = createConnection(false);
+
       try {
          connection.setClientID("FQQNconn");
          connection.start();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Topic topic = session.createTopic(multicastAddress.toString() + "::someaddress");
+         Topic fqqn = session.createTopic(multicastAddress.toString() + "::" + queueName);
 
-         MessageConsumer consumer1 = session.createConsumer(topic);
-         MessageConsumer consumer2 = session.createConsumer(topic);
-         MessageConsumer consumer3 = session.createConsumer(topic);
+         MessageConsumer consumer1 = session.createConsumer(fqqn);
+         MessageConsumer consumer2 = session.createConsumer(fqqn);
 
+         Topic topic = session.createTopic(multicastAddress.toString());
          MessageProducer producer = session.createProducer(topic);
 
          producer.send(session.createMessage());
@@ -112,10 +179,10 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
          //each consumer receives one
          Message m = consumer1.receive(2000);
          assertNotNull(m);
+
+         // Subscribing to FQQN is akin to shared subscription
          m = consumer2.receive(2000);
-         assertNotNull(m);
-         m = consumer3.receive(2000);
-         assertNotNull(m);
+         assertNull(m);
 
          Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
          for (Binding b : bindings.getBindings()) {
@@ -192,11 +259,16 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
       }
    }
 
+   /**
+    * Broker should return exception if no address is passed in FQQN.
+    * @throws Exception
+    */
    @Test
    public void testQueueSpecial() throws Exception {
       server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
 
       Connection connection = createConnection();
+      Exception expectedException = null;
       try {
          connection.start();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -204,39 +276,11 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
          //::queue ok!
          String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
          javax.jms.Queue q1 = session.createQueue(specialName);
-
-         ClientSessionFactory cf = createSessionFactory(locator);
-         ClientSession coreSession = cf.createSession();
-
-         ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
-         sendMessages(coreSession, coreProducer, 1);
-
-         System.out.println("create consumer: " + q1);
-         MessageConsumer consumer1 = session.createConsumer(q1);
-
-         assertNotNull(consumer1.receive(2000));
-
-         //queue::
-         specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString();
-         q1 = session.createQueue(specialName);
-         try {
-            session.createConsumer(q1);
-            fail("should get exception");
-         } catch (InvalidDestinationException e) {
-            //expected
-         }
-
-         //::
-         specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString();
-         q1 = session.createQueue(specialName);
-         try {
-            session.createConsumer(q1);
-            fail("should get exception");
-         } catch (InvalidDestinationException e) {
-            //expected
-         }
-      } finally {
-         connection.close();
+         session.createConsumer(q1);
+      } catch (InvalidDestinationException e) {
+         expectedException = e;
       }
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("Queue: 'q1' does not exist for address ''"));
    }
 }