You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 12:34:39 UTC
[1/3] activemq-artemis git commit: MQTT Handle ANYCAST addresses
Repository: activemq-artemis
Updated Branches:
refs/heads/ARTEMIS-780 eaa61333b -> 5f428e64a
MQTT Handle ANYCAST addresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f428e64
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f428e64
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f428e64
Branch: refs/heads/ARTEMIS-780
Commit: 5f428e64a8e1f3c2a0128ab7937f137e88a9836c
Parents: d43c576
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 12:28:06 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 1 12:34:19 2016 +0000
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQExceptionType.java | 6 ++++
...ActiveMQUnexpectedRoutingTypeForAddress.java | 31 ++++++++++++++++++++
.../protocol/mqtt/MQTTSubscriptionManager.java | 17 ++++++++---
.../core/server/ActiveMQMessageBundle.java | 5 ++++
.../integration/mqtt/imported/MQTTTest.java | 24 +++++++++++++++
5 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f428e64/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 0221562..309a8c4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -219,6 +219,12 @@ public enum ActiveMQExceptionType {
public ActiveMQException createException(String msg) {
return new ActiveMQQueueMaxConsumerLimitReached(msg);
}
+ },
+ UNEXPECTED_ROUTING_TYPE_FOR_ADDRESS(215) {
+ @Override
+ public ActiveMQException createException(String msg) {
+ return new ActiveMQUnexpectedRoutingTypeForAddress(msg);
+ }
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f428e64/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
new file mode 100644
index 0000000..1bd7ecd
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because a queue exists on the server.
+ */
+public final class ActiveMQUnexpectedRoutingTypeForAddress extends ActiveMQException {
+
+ public ActiveMQUnexpectedRoutingTypeForAddress() {
+ super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
+ }
+
+ public ActiveMQUnexpectedRoutingTypeForAddress(String msg) {
+ super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f428e64/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index d894910..a264e88 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -25,8 +25,10 @@ import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class MQTTSubscriptionManager {
@@ -61,7 +63,8 @@ public class MQTTSubscriptionManager {
synchronized void start() throws Exception {
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
- Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
+ String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName());
+ Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
}
}
@@ -84,8 +87,8 @@ public class MQTTSubscriptionManager {
/**
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
*/
- private Queue createQueueForSubscription(String topic, int qos) throws Exception {
- String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+ private Queue createQueueForSubscription(String address, int qos) throws Exception {
+
SimpleString queue = getQueueNameForTopic(address);
Queue q = session.getServer().locateQueue(queue);
@@ -113,9 +116,15 @@ public class MQTTSubscriptionManager {
int qos = subscription.qualityOfService().value();
String topic = subscription.topicName();
+ String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+ AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
+ if (addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) {
+ throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
+ }
+
session.getSessionState().addSubscription(subscription);
- Queue q = createQueueForSubscription(topic, qos);
+ Queue q = createQueueForSubscription(coreAddress, qos);
if (s == null) {
createConsumerForSubscriptionQueue(q, topic, qos);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f428e64/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 769d183..9475461 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -35,11 +35,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
+import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.Message;
@@ -381,4 +383,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName);
+
+ @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f428e64/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 6406955..dd0098a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1639,4 +1639,28 @@ public class MQTTTest extends MQTTTestSupport {
assertNotNull(peerDisconnectedException);
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
}
+
+ @Test(timeout = 60 * 1000)
+ public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception {
+ Exception peerDisconnectedException = null;
+ try {
+ SimpleString coreAddress = new SimpleString("foo.bar");
+ Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+
+ AddressInfo addressInfo = new AddressInfo(coreAddress);
+ addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+ getServer().createOrUpdateAddressInfo(addressInfo);
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("test-mqtt");
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(mqttSubscription);
+ } catch (EOFException e) {
+ peerDisconnectedException = e;
+ }
+ assertNotNull(peerDisconnectedException);
+ assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+ }
}
[3/3] activemq-artemis git commit: fix up checkstyle issues
Posted by ma...@apache.org.
fix up checkstyle issues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fc11b3b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fc11b3b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fc11b3b1
Branch: refs/heads/ARTEMIS-780
Commit: fc11b3b12c020d31811e47137a470d69f5c69120
Parents: eaa6133
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 11:40:09 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 1 12:34:19 2016 +0000
----------------------------------------------------------------------
.../artemis/core/journal/impl/JournalImpl.java | 6 +++---
.../journal/codec/PersistentQueueBindingEncoding.java | 3 +--
.../activemq/artemis/core/server/impl/QueueImpl.java | 5 +----
.../activemq/artemis/tools/migrate/config/Main.java | 3 +--
.../migrate/config/XMLConfigurationMigration.java | 13 +++++--------
.../tests/integration/addressing/AddressingTest.java | 8 +++-----
6 files changed, 14 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index b1093ed..c3d3a87 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -48,8 +48,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -866,7 +866,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
- return (sync && callback == null) ? new SimpleFuture<>() : null;
+ return (sync && callback == null) ? new SimpleFuture<Boolean>() : null;
}
@Override
@@ -2227,7 +2227,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
- threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
+ threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue(), factory);
ioExecutorFactory = new OrderedExecutorFactory(threadPool);
} else {
ioExecutorFactory = providedIOThreadPool;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 78a81ea..169cd7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -176,8 +176,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
if (buffer.readableBytes() > 0) {
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
- }
- else {
+ } else {
maxConsumers = -1;
deleteOnNoConsumers = false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6887637..ebad25d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@@ -827,8 +825,7 @@ public class QueueImpl implements Queue {
if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) {
try {
deleteQueue();
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("Error deleting queue on no consumers. " + this.toString(), e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/Main.java
----------------------------------------------------------------------
diff --git a/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/Main.java b/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/Main.java
index c45d92a..94d4d53 100644
--- a/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/Main.java
+++ b/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/Main.java
@@ -37,8 +37,7 @@ public class Main {
} else {
try {
XMLConfigurationMigration migration = new XMLConfigurationMigration(input, new File(args[1]));
- }
- catch (Exception e) {
+ } catch (Exception e) {
// Unable to process file, move on.
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/XMLConfigurationMigration.java
----------------------------------------------------------------------
diff --git a/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/XMLConfigurationMigration.java b/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/XMLConfigurationMigration.java
index f5811a6..4e47999 100644
--- a/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/XMLConfigurationMigration.java
+++ b/artemis-tools/src/main/java/org/apache/activemq/artemis/tools/migrate/config/XMLConfigurationMigration.java
@@ -31,7 +31,6 @@ import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -111,8 +110,7 @@ public class XMLConfigurationMigration {
if (coreElement == null) {
throw new Exception("Not a artemis config");
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new Exception(e);
}
}
@@ -194,8 +192,7 @@ public class XMLConfigurationMigration {
Address address;
if (jmsQueueAddresses.containsKey(name)) {
address = jmsQueueAddresses.get(name);
- }
- else {
+ } else {
address = new Address();
address.setName(name);
address.setRoutingType("anycast");
@@ -217,8 +214,7 @@ public class XMLConfigurationMigration {
Address address;
if (jmsTopicAddresses.containsKey(name)) {
address = jmsTopicAddresses.get(name);
- }
- else {
+ } else {
address = new Address();
address.setName(name);
address.setRoutingType("multicast");
@@ -247,7 +243,8 @@ public class XMLConfigurationMigration {
}
private void writeAddressListToDoc(String comment, Collection<Address> addresses, Node addressElement) {
- if (addresses.isEmpty()) return;
+ if (addresses.isEmpty())
+ return;
addressElement.appendChild(document.createComment("=================="));
addressElement.appendChild(document.createComment(comment));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fc11b3b1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index a21a62b..c2004e7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -159,7 +158,7 @@ public class AddressingTest extends ActiveMQTestBase {
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientConsumer consumer3 = session.createConsumer(q3.getName());
- List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3}));
+ List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[]{consumer1, consumer2, consumer3}));
List<String> messages = new ArrayList<>();
messages.add("Message1");
@@ -272,9 +271,8 @@ public class AddressingTest extends ActiveMQTestBase {
ClientSession session = sessionFactory.createSession();
session.start();
- ClientConsumer consumer1 = session.createConsumer(q1.getName());
- }
- catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ session.createConsumer(q1.getName());
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
expectedException = e;
}
[2/3] activemq-artemis git commit: Add MQTT Test for MaxConsumer
threshold
Posted by ma...@apache.org.
Add MQTT Test for MaxConsumer threshold
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d43c5769
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d43c5769
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d43c5769
Branch: refs/heads/ARTEMIS-780
Commit: d43c5769208fcc81c9b9cf3ee590ac2af0b40e3e
Parents: fc11b3b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 11:40:29 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 1 12:34:19 2016 +0000
----------------------------------------------------------------------
.../integration/mqtt/imported/MQTTTest.java | 27 ++++++++++++++++++++
.../mqtt/imported/MQTTTestSupport.java | 4 +++
2 files changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d43c5769/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index b809df0..6406955 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -22,6 +22,7 @@ import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.io.EOFException;
import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.util.ArrayList;
@@ -34,8 +35,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
@@ -1612,4 +1615,28 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
+
+ @Test(timeout = 60 * 1000)
+ public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception {
+ Exception peerDisconnectedException = null;
+ try {
+ SimpleString coreAddress = new SimpleString("foo.bar");
+ Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+
+ AddressInfo addressInfo = new AddressInfo(coreAddress);
+ addressInfo.setDefaultMaxConsumers(0);
+ getServer().createOrUpdateAddressInfo(addressInfo);
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("test-mqtt");
+ mqtt.setKeepAlive((short) 2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(mqttSubscription);
+ } catch (EOFException e) {
+ peerDisconnectedException = e;
+ }
+ assertNotNull(peerDisconnectedException);
+ assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d43c5769/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 27ebde0..15cb8b6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -92,6 +92,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return name.getMethodName();
}
+ public ActiveMQServer getServer() {
+ return server;
+ }
+
@Override
@Before
public void setUp() throws Exception {