You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/31 17:18:35 UTC
activemq-artemis git commit: ARTEMIS-1568 / ARTEMIS-1858 Expiry
messages are not transversing clustering with AMQP [Forced Update!]
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x 43ab7ec7f -> 03e603b5c (forced update)
ARTEMIS-1568 / ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP
(cherry picked from commit 1ae2784dc6075875b18780fa8ba40f86cb895f7b)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/03e603b5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/03e603b5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/03e603b5
Branch: refs/heads/2.6.x
Commit: 03e603b5c35cb1eccb552f6e0949968d3a200992
Parents: e819c27
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 29 22:12:58 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu May 31 13:18:20 2018 -0400
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 18 ++
.../message/impl/CoreMessageObjectPools.java | 4 +
.../protocol/amqp/broker/AMQPMessage.java | 44 ++-
.../amqp/broker/AMQPSessionCallback.java | 2 +-
.../amqp/converter/AmqpCoreConverter.java | 3 +
.../amqp/converter/CoreAmqpConverter.java | 2 +-
.../protocol/amqp/message/AMQPMessageTest.java | 4 +-
.../core/postoffice/impl/PostOfficeImpl.java | 4 +-
.../ProtocolsMessageLoadBalancingTest.java | 284 +++++++++++++++++++
.../cluster/crossprotocol/package-info.java | 20 ++
10 files changed, 366 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 6ca37ea..667f95f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -355,10 +355,28 @@ public interface Message {
String getAddress();
+ /**
+ * Look at {@link #setAddress(SimpleString)} for the doc.
+ * @param address
+ * @return
+ */
Message setAddress(String address);
SimpleString getAddressSimpleString();
+ /**
+ * This will set the address on CoreMessage.
+ *
+ * Note for AMQPMessages:
+ * in AMQPMessages this will not really change the address on the message. Instead it will add a property
+ * on extraProperties which only transverse internally at the broker.
+ * Whatever you change here it won't affect anything towards the received message.
+ *
+ * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
+ * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
+ * @param address
+ * @return
+ */
Message setAddress(SimpleString address);
long getTimestamp();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
index 7ee7d0a..4c56eac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -54,4 +54,8 @@ public class CoreMessageObjectPools {
public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
return propertiesStringSimpleStringPools.get();
}
+
+ public static SimpleString cachedAddressSimpleString(String address, CoreMessageObjectPools coreMessageObjectPools) {
+ return SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 2775f77..27d571e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -67,6 +67,8 @@ import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
+ public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
+
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
public static final int MAX_MESSAGE_PRIORITY = 9;
@@ -103,19 +105,20 @@ public class AMQPMessage extends RefCountMessage {
* these are properties created by the broker only */
private volatile TypedProperties extraProperties;
- public AMQPMessage(long messageFormat, byte[] data) {
- this(messageFormat, data, null);
+ public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties) {
+ this(messageFormat, data, extraProperties, null);
}
- public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
- this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
+ public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
+ this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), extraProperties, coreMessageObjectPools);
}
- public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
+ public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
this.data = data;
this.messageFormat = messageFormat;
this.bufferValid = true;
this.coreMessageObjectPools = coreMessageObjectPools;
+ this.extraProperties = extraProperties == null ? null : new TypedProperties(extraProperties);
parseHeaders();
}
@@ -496,7 +499,7 @@ public class AMQPMessage extends RefCountMessage {
view.position(messagePaylodStart);
view.get(newData, headerEnds, view.remaining());
- AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
+ AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
return newEncode;
}
@@ -604,26 +607,40 @@ public class AMQPMessage extends RefCountMessage {
return addressSimpleString == null ? null : addressSimpleString.toString();
}
+
+ public SimpleString cachedAddressSimpleString(String address) {
+ return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
+ }
+
@Override
public AMQPMessage setAddress(String address) {
- this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
+ setAddress(cachedAddressSimpleString(address));
return this;
}
@Override
public AMQPMessage setAddress(SimpleString address) {
this.address = address;
+ createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address);
return this;
}
@Override
public SimpleString getAddressSimpleString() {
if (address == null) {
- Properties properties = getProtonMessage().getProperties();
- if (properties != null) {
- setAddress(properties.getTo());
- } else {
- return null;
+ TypedProperties extraProperties = getExtraProperties();
+
+ // we first check if extraProperties is not null, no need to create it just to check it here
+ if (extraProperties != null) {
+ address = extraProperties.getSimpleStringProperty(ADDRESS_PROPERTY);
+ }
+
+ if (address == null) {
+ // if it still null, it will look for the address on the getTo();
+ Properties properties = getProperties();
+ if (properties != null && properties.getTo() != null) {
+ address = cachedAddressSimpleString(properties.getTo());
+ }
}
}
return address;
@@ -1261,6 +1278,9 @@ public class AMQPMessage extends RefCountMessage {
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
+ ", applicationProperties=" + getApplicationProperties() +
+ ", properties=" + getProperties() +
+ ", extraProperties = " + getExtraProperties() +
"]";
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index aac3f2a..6461bb2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -460,7 +460,7 @@ public class AMQPSessionCallback implements SessionCallback {
SimpleString address,
int messageFormat,
ReadableBuffer data) throws Exception {
- AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
+ AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
if (address != null) {
message.setAddress(address);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 80969f6..d070579 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -179,6 +179,9 @@ public class AmqpCoreConverter {
TypedProperties properties = message.getExtraProperties();
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
+ if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
+ continue;
+ }
result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index abda58a..66c75a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -316,7 +316,7 @@ public class CoreAmqpConverter {
byte[] data = new byte[buffer.writerIndex()];
buffer.readBytes(data);
- AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data);
+ AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data, null);
amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
amqpMessage.setReplyTo(coreMessage.getReplyTo());
return amqpMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index bff43a8..42ffaee 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -291,7 +291,7 @@ public class AMQPMessageTest {
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
- return new AMQPMessage(0, bytes);
+ return new AMQPMessage(0, bytes, null);
}
private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
@@ -302,6 +302,6 @@ public class AMQPMessageTest {
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
- return new AMQPMessage(0, bytes);
+ return new AMQPMessage(0, bytes, null);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b722532..0e21da2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -977,9 +977,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
- if (copyRedistribute.getAddress() == null) {
- copyRedistribute.setAddress(originatingQueue.getAddress());
- }
+ copyRedistribute.setAddress(originatingQueue.getAddress());
if (tx != null) {
tx.addOperation(new TransactionOperationAbstract() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
new file mode 100644
index 0000000..f1d0906
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
+
+ private static final int NUMBER_OF_SERVERS = 2;
+ private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
+
+
+ // I'm taking any number that /2 = Odd
+ // to avoid perfect roundings and making sure messages are evenly distributed
+ private static final int NUMBER_OF_MESSAGES = 77 * 2;
+
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection getParameters() {
+ return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}});
+ }
+
+ @Parameterized.Parameter(0)
+ public String protocol;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ }
+
+ private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception {
+ setupServers();
+
+ setRedistributionDelay(0);
+
+ setupCluster(loadBalancingType);
+
+ AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
+
+ getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+
+ startServers(0);
+ startServers(1);
+
+ createQueue(SimpleString.toSimpleString("queues.expiry"));
+ createQueue(queueName);
+ }
+
+ private void createQueue(SimpleString queueName) throws Exception {
+ servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
+ servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
+ }
+
+ protected boolean isNetty() {
+ return true;
+ }
+
+ private ConnectionFactory getJmsConnectionFactory(int node) {
+ if (protocol.equals("AMQP")) {
+ return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
+ } else {
+ return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
+ }
+ }
+
+ private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
+ for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) {
+ for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values()) {
+ record.getBridge().pause();
+ }
+ }
+ }
+
+ @Test
+ public void testLoadBalancing() throws Exception {
+
+ startServers(MessageLoadBalancingType.STRICT);
+
+ ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
+ Connection[] connection = new Connection[NUMBER_OF_SERVERS];
+ Session[] session = new Session[NUMBER_OF_SERVERS];
+ MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
+
+ // this will pre create consumers to make sure messages are distributed evenly without redistribution
+ for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+ factory[node] = getJmsConnectionFactory(node);
+ connection[node] = factory[node].createConnection();
+ session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
+ }
+
+ waitForBindings(0, "queues.0", 1, 1, true);
+ waitForBindings(1, "queues.0", 1, 1, true);
+
+ waitForBindings(0, "queues.0", 1, 1, false);
+ waitForBindings(1, "queues.0", 1, 1, false);
+
+ pauseClusteringBridges(servers[0]);
+
+
+ // sending Messages.. they should be load balanced
+ {
+ ConnectionFactory cf = getJmsConnectionFactory(0);
+ Connection cn = cf.createConnection();
+ Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ pd.send(sn.createTextMessage("hello " + i));
+ }
+
+ cn.close();
+ }
+
+ receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
+ connection[1].start();
+ Assert.assertNull(consumer[1].receiveNoWait());
+ connection[1].stop();
+
+ servers[0].stop();
+ clearServer(0);
+
+ setupServer(0, isFileStorage(), isNetty());
+ servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+
+ setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1);
+
+ servers[0].start();
+
+ receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
+ for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+ connection[node].close();
+ }
+
+ }
+
+ @Test
+ public void testExpireRedistributed() throws Exception {
+
+ startServers(MessageLoadBalancingType.ON_DEMAND);
+
+ ConnectionFactory factory = getJmsConnectionFactory(1);
+
+
+ waitForBindings(0, "queues.0", 1, 0, true);
+ waitForBindings(1, "queues.0", 1, 0, true);
+
+ waitForBindings(0, "queues.0", 1, 0, false);
+ waitForBindings(1, "queues.0", 1, 0, false);
+
+
+ // sending Messages..
+ {
+ ConnectionFactory cf = getJmsConnectionFactory(0);
+ Connection cn = cf.createConnection();
+ Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
+ pd.setTimeToLive(200);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ pd.send(sn.createTextMessage("hello " + i));
+ }
+
+ cn.close();
+ }
+
+ // time to let stuff expire
+ Thread.sleep(200);
+
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
+
+ receiveMessages(connection, consumer, NUMBER_OF_MESSAGES, true);
+ connection.close();
+ }
+
+ private void receiveMessages(Connection connection,
+ MessageConsumer messageConsumer,
+ int messageCount,
+ boolean exactCount) throws JMSException {
+ connection.start();
+
+ for (int i = 0; i < messageCount; i++) {
+ Message msg = messageConsumer.receive(5000);
+ Assert.assertNotNull(msg);
+ }
+
+ // this means no more messages received
+ if (exactCount) {
+ Assert.assertNull(messageConsumer.receiveNoWait());
+ }
+ }
+
+ protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+ setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
+ }
+
+ protected void setRedistributionDelay(final long delay) {
+ }
+
+ protected void setupServers() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+ servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+ }
+
+ protected void stopServers() throws Exception {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
+
+ stopServers(0, 1);
+
+ clearServer(0, 1);
+ }
+
+ /**
+ * @param serverID
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected ConfigurationImpl createBasicConfig(final int serverID) {
+ ConfigurationImpl configuration = super.createBasicConfig(serverID);
+ configuration.setMessageExpiryScanPeriod(100);
+
+ return configuration;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03e603b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
new file mode 100644
index 0000000..e9b7c01
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains tests about messages crossing over protocols
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
\ No newline at end of file