You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/10/31 10:04:47 UTC

activemq-artemis git commit: ARTEMIS-2100 address routing-type overridden on attaching AMQP sender

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 89f357572 -> 2d0345b87


ARTEMIS-2100 address routing-type overridden on attaching AMQP sender

An already existing address routing type should be taken
in consideration while chosing which routing type use when none
is configured

(cherry picked from commit 1c17a4d59dc630a4ba2d3de5817dd33cbca5b431)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2d0345b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2d0345b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2d0345b8

Branch: refs/heads/2.6.x
Commit: 2d0345b873de98ef3fde3134f4987309b1006639
Parents: 89f3575
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Oct 24 15:45:53 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Wed Oct 31 11:00:44 2018 +0100

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     |  13 ++-
 .../amqp/AmqpSenderRoutingTypeTest.java         | 116 +++++++++++++++++++
 2 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d0345b8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0758714..44b4152 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -19,11 +19,13 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -229,8 +231,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             }
          }
       }
-
-      return sessionSPI.getDefaultRoutingType(address);
+      final AddressInfo addressInfo = sessionSPI.getAddress(address);
+      if (addressInfo != null && !addressInfo.getRoutingTypes().isEmpty()) {
+         if (addressInfo.getRoutingTypes().size() == 1) {
+            return addressInfo.getRoutingType();
+         }
+      }
+      RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address);
+      defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
+      return defaultRoutingType;
    }
 
    /*

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d0345b8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
new file mode 100644
index 0000000..0d46798
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpSenderRoutingTypeTest extends JMSClientTestSupport {
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      if (map.size() == 0) {
+         AddressSettings as = new AddressSettings();
+         as.setDefaultAddressRoutingType(RoutingType.ANYCAST);
+         map.put("#", as);
+      }
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE";
+   }
+
+   @Test
+   public void testAMQPSenderHonourRoutingTypeOfExistingAddress() throws Exception {
+      RoutingType routingType = server.getConfiguration().getAddressesSettings().get("#").getDefaultAddressRoutingType();
+      Assert.assertEquals(RoutingType.ANYCAST, routingType);
+      try (ActiveMQConnection coreConnection = (ActiveMQConnection) createCoreConnection();
+           ClientSession clientSession = coreConnection.getSessionFactory().createSession()) {
+         RoutingType addressRoutingType = RoutingType.MULTICAST;
+         SimpleString address = SimpleString.toSimpleString("myTopic_" + UUID.randomUUID().toString());
+         clientSession.createAddress(address, addressRoutingType, false);
+         ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
+         Assert.assertTrue(addressQuery.isExists());
+         Assert.assertTrue(addressQuery.getQueueNames().isEmpty());
+         AmqpClient client = createAmqpClient(guestUser, guestPass);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(address.toString());
+         try {
+            ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
+            Assert.assertFalse(queueQuery.isExists());
+            Assert.assertEquals(addressRoutingType, queueQuery.getRoutingType());
+         } finally {
+            sender.close();
+            session.close();
+            connection.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testAMQPSenderCreateQueueWithDefaultRoutingTypeIfAddressDoNotExist() throws Exception {
+      RoutingType defaultRoutingType = server.getConfiguration().getAddressesSettings().get("#").getDefaultAddressRoutingType();
+      Assert.assertEquals(RoutingType.ANYCAST, defaultRoutingType);
+      try (ActiveMQConnection coreConnection = (ActiveMQConnection) createCoreConnection();
+           ClientSession clientSession = coreConnection.getSessionFactory().createSession()) {
+         SimpleString address = SimpleString.toSimpleString("myTopic_" + UUID.randomUUID().toString());
+         ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
+         Assert.assertFalse(addressQuery.isExists());
+         Assert.assertTrue(addressQuery.getQueueNames().isEmpty());
+         AmqpClient client = createAmqpClient(guestUser, guestPass);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(address.toString());
+         try {
+            addressQuery = clientSession.addressQuery(address);
+            Assert.assertTrue(addressQuery.isExists());
+            Assert.assertThat(addressQuery.getQueueNames(), CoreMatchers.hasItem(address));
+            ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
+            Assert.assertTrue(queueQuery.isExists());
+            Assert.assertEquals(defaultRoutingType, queueQuery.getRoutingType());
+         } finally {
+            sender.close();
+            session.close();
+            connection.close();
+         }
+      }
+
+   }
+}