You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:26 UTC
[42/50] [abbrv] activemq-artemis git commit: ARTEMIS-789 Fix Failing
Tests
ARTEMIS-789 Fix Failing Tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/683ae689
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/683ae689
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/683ae689
Branch: refs/heads/master
Commit: 683ae6898936056faedb228b8e2b89cf91df1c1e
Parents: 61aec1b
Author: jbertram <jb...@apache.org>
Authored: Wed Nov 30 08:19:32 2016 -0600
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000
----------------------------------------------------------------------
.../core/client/impl/ClientSessionImpl.java | 2 +-
.../jms/client/ActiveMQMessageProducer.java | 6 +-
.../artemis/jms/client/ActiveMQSession.java | 29 ++-
.../core/protocol/mqtt/MQTTPublishManager.java | 16 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
.../codec/PersistentAddressBindingEncoding.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 19 +-
.../addressing/AddressConfigTest.java | 2 +-
.../integration/addressing/AnycastTest.java | 188 +++++++++++++++++++
.../client/JmsNettyNioStressTest.java | 8 +-
.../clientcrash/PendingDeliveriesTest.java | 3 +-
.../jms/cluster/JMSFailoverTest.java | 7 +-
.../integration/mqtt/imported/MQTTTest.java | 11 +-
.../openwire/SimpleOpenWireTest.java | 3 +-
.../integration/server/ResourceLimitTest.java | 2 +-
.../activemq/artemis/common/AbstractAdmin.java | 2 +-
16 files changed, 262 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index dd10e5b..f1b9cef 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -535,7 +535,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
queueName,
routingType,
null,
- true,
+ false,
false,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 4c1d335..b814bc2 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -36,6 +36,7 @@ import javax.jms.TopicPublisher;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -421,7 +422,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
} else {
connection.addKnownDestination(address);
}
- } catch (ActiveMQException e) {
+ } catch (ActiveMQQueueExistsException e) {
+ // The queue was created by another client/admin between the query check and send create queue packet
+ }
+ catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 3e9b76f..a25215e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -301,15 +301,21 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
if (!response.isExists()) {
- if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
- // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
- session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
- session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true);
- } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
- session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
- } else {
- throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
+ try {
+ if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
+ // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
+ session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
+ session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true);
+ } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
+ session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
+ } else {
+ throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
+ }
+ }
+ catch (ActiveMQQueueExistsException e) {
+ // Queue was created between our query and create queue request. Ignore.
}
+
}
}
@@ -647,7 +653,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
*/
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateJmsQueues()) {
- session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
+ try {
+ session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
+ }
+ catch (ActiveMQQueueExistsException e) {
+ // The queue was created by another client/admin between the query check and send create queue packet
+ }
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 3a2ad7e..c266e76 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -83,7 +83,7 @@ public class MQTTPublishManager {
}
private void createManagementAddress() {
- managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
+ managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
}
private void createManagementQueue() throws Exception {
@@ -113,10 +113,13 @@ public class MQTTPublishManager {
if (qos == 0) {
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
- } else {
+ } else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+ } else {
+ // Client must have disconnected and it's Subscription QoS cleared
+ consumer.individualCancel(message.getMessageID(), false);
}
}
}
@@ -231,7 +234,14 @@ public class MQTTPublishManager {
}
private int decideQoS(ServerMessage message, ServerConsumer consumer) {
- int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+
+ int subscriptionQoS = -1;
+ try {
+ subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+ } catch (NullPointerException e) {
+ // This can happen if the client disconnected during a server send.
+ return subscriptionQoS;
+ }
int qos = 2;
if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index b3542d3..c4b8b94 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -94,7 +94,7 @@ public class MQTTSubscriptionManager {
Queue q = session.getServer().locateQueue(queue);
if (q == null) {
- q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true);
+ q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
} else {
if (q.isDeleteOnNoConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
index c3aa9de..9684481 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
@@ -48,6 +48,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
for (RoutingType routingType : routingTypes) {
sb.append(routingType.toString() + ",");
}
+ if (sb.charAt(sb.length() - 1) == ',') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ sb.append("}");
sb.append(", autoCreated=" + autoCreated + "]");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 06852ce..abcbb89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2314,16 +2314,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
recoverStoredConfigs();
+ Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
+
+ journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
+
Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<>();
journalLoader.initQueues(queueBindingInfosMap, queueBindingInfos);
journalLoader.handleGroupingBindings(groupingInfos);
- Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
-
- journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
-
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>();
@@ -2473,10 +2473,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
AddressInfo info = postOffice.getAddressInfo(addressName);
+ boolean addressAlreadyExists = true;
+
if (info == null) {
if (autoCreateAddress) {
- postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true));
- info = postOffice.getAddressInfo(addressName);
+ createAddressInfo(defaultAddressInfo.setAutoCreated(true));
+ addressAlreadyExists = false;
} else {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
@@ -2486,12 +2488,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Queue queue = queueFactory.createQueueWith(queueConfig);
- boolean addressAlreadyExists = true;
-
AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress());
if (addressInfo == null) {
- postOffice.addAddressInfo(new AddressInfo(queue.getAddress()));
- addressAlreadyExists = false;
+ createAddressInfo(new AddressInfo(queue.getAddress()));
} else {
if (!addressInfo.getRoutingTypes().contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
index 4e3f689..0beaab7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java
@@ -52,6 +52,6 @@ public class AddressConfigTest extends ActiveMQTestBase {
Set<RoutingType> routingTypeSet = new HashSet<>();
routingTypeSet.add(RoutingType.MULTICAST);
- assertEquals(RoutingType.MULTICAST, addressInfo.getRoutingTypes());
+ assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
new file mode 100644
index 0000000..9208386
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.addressing;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AnycastTest extends ActiveMQTestBase {
+
+ private SimpleString baseAddress = new SimpleString("anycast.address");
+
+ private AddressInfo addressInfo;
+
+ private ActiveMQServer server;
+
+ private ClientSessionFactory sessionFactory;
+
+ @Before
+ public void setup() throws Exception {
+ server = createServer(true);
+ server.start();
+
+ server.waitForActivation(10, TimeUnit.SECONDS);
+
+ ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory = sl.createSessionFactory();
+
+ addSessionFactory(sessionFactory);
+
+ addressInfo = new AddressInfo(baseAddress);
+ addressInfo.addRoutingType(RoutingType.ANYCAST);
+ server.createOrUpdateAddressInfo(addressInfo);
+ }
+
+ @Test
+ public void testTxCommitReceive() throws Exception {
+
+ Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+ Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+
+ ClientSession session = sessionFactory.createSession(false, false);
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+ ClientProducer producer = session.createProducer(baseAddress);
+
+ final int num = 10;
+
+ for (int i = 0; i < num; i++) {
+ ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+ m.getBodyBuffer().writeString("AnyCast" + i);
+ producer.send(m);
+ }
+ assertNull(consumer1.receive(200));
+ assertNull(consumer2.receive(200));
+ session.commit();
+
+ assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount()));
+ assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount()));
+
+ ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2};
+ for (int i = 0; i < consumers.length; i++) {
+
+ for (int j = 0; j < num / 2; j++) {
+ ClientMessage m = consumers[i].receive(2000);
+ assertNotNull(m);
+ System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+ }
+
+ assertNull(consumers[i].receive(200));
+ session.commit();
+
+ assertNull(consumers[i].receive(200));
+ }
+
+ q1.deleteQueue();
+ q2.deleteQueue();
+ }
+
+ @Test
+ public void testTxRollbackReceive() throws Exception {
+
+ Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+ Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+
+ ClientSession session = sessionFactory.createSession(false, false);
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+ ClientProducer producer = session.createProducer(baseAddress);
+
+ final int num = 10;
+
+ for (int i = 0; i < num; i++) {
+ ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+ m.getBodyBuffer().writeString("AnyCast" + i);
+ producer.send(m);
+ }
+ assertNull(consumer1.receive(200));
+ assertNull(consumer2.receive(200));
+ session.commit();
+ session.close();
+
+ assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount()));
+ assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount()));
+
+ ClientSession session1 = sessionFactory.createSession(false, false);
+ ClientSession session2 = sessionFactory.createSession(false, false);
+ session1.start();
+ session2.start();
+
+ consumer1 = session1.createConsumer(q1.getName());
+ consumer2 = session2.createConsumer(q2.getName());
+
+ ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2};
+ ClientSession[] sessions = new ClientSession[]{session1, session2};
+ Queue[] queues = new Queue[]{q1, q2};
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ for (int j = 0; j < num / 2; j++) {
+ ClientMessage m = consumers[i].receive(2000);
+ assertNotNull(m);
+ System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+ }
+
+ assertNull(consumers[i].receive(200));
+ sessions[i].rollback();
+ sessions[i].close();
+
+ sessions[i] = sessionFactory.createSession(false, false);
+ sessions[i].start();
+
+ //receive same after rollback
+ consumers[i] = sessions[i].createConsumer(queues[i].getName());
+
+ for (int j = 0; j < num / 2; j++) {
+ ClientMessage m = consumers[i].receive(2000);
+ assertNotNull(m);
+ System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+ }
+
+ assertNull(consumers[i].receive(200));
+ sessions[i].commit();
+
+ assertNull(consumers[i].receive(200));
+ sessions[i].close();
+ }
+
+ q1.deleteQueue();
+ q2.deleteQueue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java
index a721aca..ccbc4b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -128,8 +130,10 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase {
// create the 2 queues used in the test
ClientSessionFactory sf = locator.createSessionFactory(transpConf);
ClientSession session = sf.createTransactedSession();
- session.createQueue("queue", "queue");
- session.createQueue("queue2", "queue2");
+ session.createAddress(SimpleString.toSimpleString("queue"), RoutingType.ANYCAST, false);
+ session.createAddress(SimpleString.toSimpleString("queue2"), RoutingType.ANYCAST, false);
+ session.createQueue("queue", RoutingType.ANYCAST, "queue");
+ session.createQueue("queue2", RoutingType.ANYCAST, "queue2");
session.commit();
sf.close();
session.close();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
index e550bef..0738562 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
@@ -26,6 +26,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
@@ -39,7 +40,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
@Before
public void createQueue() throws Exception {
- server.createQueue(SimpleString.toSimpleString("queue1"), SimpleString.toSimpleString("queue1"), null, true, false);
+ server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false);
}
@After
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
index 6e960f2..9fa9ac4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
@@ -198,7 +199,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
SimpleString jmsQueueName = new SimpleString("myqueue");
- coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
+ coreSession.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true);
Queue queue = sess.createQueue("myqueue");
@@ -271,7 +272,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
SimpleString jmsQueueName = new SimpleString("myqueue");
- coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
+ coreSessionLive.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true);
Queue queue = sessLive.createQueue("myqueue");
@@ -377,7 +378,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
}
};
- coreSession.createQueue(QUEUE, QUEUE, true);
+ coreSession.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, true);
Queue queue = sess.createQueue("somequeue");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index c342853..58d75d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport {
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = s.createQueue(destinationName);
- MessageProducer producer = s.createProducer(queue);
+ javax.jms.Topic topic = s.createTopic(destinationName);
+ MessageProducer producer = s.createProducer(topic);
// send retained message from JMS
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport {
SimpleString coreAddress = new SimpleString("foo.bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
- AddressInfo addressInfo = new AddressInfo(coreAddress);
- getServer().createOrUpdateAddressInfo(addressInfo);
-
- getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
+ getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
@@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport {
try {
String clientId = "testMqtt";
SimpleString coreAddress = new SimpleString("foo.bar");
- getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false);
+ getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 4fccfa3..a6bb55f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -692,7 +693,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Connection conn1 = null;
SimpleString durableQueue = new SimpleString("exampleQueue");
- this.server.createQueue(durableQueue, durableQueue, null, true, false);
+ this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false);
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
index a672bb1..2e77bb9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
@@ -59,7 +59,7 @@ public class ResourceLimitTest extends ActiveMQTestBase {
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("myUser", "password");
securityManager.getConfiguration().addRole("myUser", "arole");
- Role role = new Role("arole", false, false, false, false, true, true, false, true, false, false);
+ Role role = new Role("arole", false, false, false, false, true, true, false, true, false, true);
Set<Role> roles = new HashSet<>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/683ae689/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
----------------------------------------------------------------------
diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
index 2ada3be..bcff21b 100644
--- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
+++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
@@ -149,7 +149,7 @@ public class AbstractAdmin implements Admin {
@Override
public void createTopic(final String name) {
try {
- invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, new Object[]{"MULTICAST"});
+ invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, "MULTICAST");
} catch (Exception e) {
throw new IllegalStateException(e);
}