You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/29 01:40:29 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1352 auto-create MULTICAST
queue when AMQP client sends to topic
Repository: activemq-artemis
Updated Branches:
refs/heads/master 6fc51b10d -> 3035a57e4
ARTEMIS-1352 auto-create MULTICAST queue when AMQP client sends to topic
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d1cd1e71
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d1cd1e71
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d1cd1e71
Branch: refs/heads/master
Commit: d1cd1e71a198927c5180a1a032d6796e82b86da8
Parents: 6fc51b1
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Sep 6 09:07:21 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 28 21:40:18 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 23 ++++++++++-----
.../proton/ProtonServerReceiverContext.java | 15 ++++++----
.../integration/amqp/AmqpMaxFrameSizeTest.java | 3 ++
.../tests/integration/client/ConsumerTest.java | 30 ++++++++++++++++++++
4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6807ada..21afbf9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -21,6 +21,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
@@ -276,15 +277,23 @@ public class AMQPSessionCallback implements SessionCallback {
return queueQueryResult;
}
- public boolean bindingQuery(String address) throws Exception {
- BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
- if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
+ public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
+ SimpleString simpleAddress = SimpleString.toSimpleString(address);
+ BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+ if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
try {
- serverSession.createQueue(new SimpleString(address), new SimpleString(address), RoutingType.ANYCAST, null, false, true);
+ serverSession.createAddress(simpleAddress, routingType, true);
+ } catch (ActiveMQAddressExistsException e) {
+ // The address may have been created by another thread in the mean time. Catch and do nothing.
+ }
+ bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+ } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
+ try {
+ serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
- bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+ bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
}
return bindingQueryResult.isExists();
}
@@ -406,7 +415,7 @@ public class AMQPSessionCallback implements SessionCallback {
return;
}
- if (!bindingQuery(message.getAddress().toString())) {
+ if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
}
@@ -660,7 +669,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
public RoutingType getDefaultRoutingType(String address) {
- return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
+ return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
}
public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 2feb8da..eee35a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -119,12 +119,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (address != null && !address.isEmpty()) {
try {
- if (!sessionSPI.bindingQuery(address)) {
+ if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
+ e.printStackTrace();
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
@@ -177,11 +178,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
private RoutingType getRoutingType(Symbol[] symbols) {
- for (Symbol symbol : symbols) {
- if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
- return RoutingType.MULTICAST;
- } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
- return RoutingType.ANYCAST;
+ if (symbols != null) {
+ for (Symbol symbol : symbols) {
+ if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
+ return RoutingType.MULTICAST;
+ } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
+ return RoutingType.ANYCAST;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
index 778cd40..c6e1008 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -40,6 +42,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleTransfers() throws Exception {
+ server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 69c7f68..fc474aa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -231,6 +231,36 @@ public class ConsumerTest extends ActiveMQTestBase {
}
@Test
+ public void testAutoCreateMulticastAddress() throws Throwable {
+ if (!isNetty()) {
+ // no need to run the test, there's no AMQP support
+ return;
+ }
+
+ assertNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
+
+ ConnectionFactory factorySend = createFactory(2);
+ Connection connection = factorySend.createConnection();
+
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Topic topic = session.createTopic("topic");
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage msg = session.createTextMessage("hello");
+ msg.setIntProperty("mycount", 0);
+ producer.send(msg);
+ } finally {
+ connection.close();
+ }
+
+ assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
+ assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("topic")).getRoutingType());
+ assertEquals(0, server.getTotalMessageCount());
+ }
+
+ @Test
public void testSendCoreReceiveAMQP() throws Throwable {
if (!isNetty()) {
[2/2] activemq-artemis git commit: This closes #1558
Posted by cl...@apache.org.
This closes #1558
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3035a57e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3035a57e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3035a57e
Branch: refs/heads/master
Commit: 3035a57e4859d09ea27f1b3a453408e56e30cfb1
Parents: 6fc51b1 d1cd1e7
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 28 21:40:19 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 28 21:40:19 2017 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 23 ++++++++++-----
.../proton/ProtonServerReceiverContext.java | 15 ++++++----
.../integration/amqp/AmqpMaxFrameSizeTest.java | 3 ++
.../tests/integration/client/ConsumerTest.java | 30 ++++++++++++++++++++
4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------