You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2018/05/31 09:48:26 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9b7ebef9f -> 545b82fbd


ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ae2784d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ae2784d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ae2784d

Branch: refs/heads/master
Commit: 1ae2784dc6075875b18780fa8ba40f86cb895f7b
Parents: 9b7ebef
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 29 22:12:58 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed May 30 18:57:38 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  18 ++
 .../message/impl/CoreMessageObjectPools.java    |   4 +
 .../protocol/amqp/broker/AMQPMessage.java       |  44 ++-
 .../amqp/broker/AMQPSessionCallback.java        |   2 +-
 .../amqp/converter/AmqpCoreConverter.java       |   3 +
 .../amqp/converter/CoreAmqpConverter.java       |   2 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |   4 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +-
 .../ProtocolsMessageLoadBalancingTest.java      | 284 +++++++++++++++++++
 .../cluster/crossprotocol/package-info.java     |  20 ++
 10 files changed, 366 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 6ca37ea..667f95f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -355,10 +355,28 @@ public interface Message {
 
    String getAddress();
 
+   /**
+    * Look at {@link #setAddress(SimpleString)} for the doc.
+    * @param address
+    * @return
+    */
    Message setAddress(String address);
 
    SimpleString getAddressSimpleString();
 
+   /**
+    * This will set the address on CoreMessage.
+    *
+    * Note for AMQPMessages:
+    * in AMQPMessages this will not really change the address on the message. Instead it will add a property
+    * on extraProperties which only transverse internally at the broker.
+    * Whatever you change here it won't affect anything towards the received message.
+    *
+    * If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
+    * that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
+    * @param address
+    * @return
+    */
    Message setAddress(SimpleString address);
 
    long getTimestamp();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
index 7ee7d0a..4c56eac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -54,4 +54,8 @@ public class CoreMessageObjectPools {
    public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
       return propertiesStringSimpleStringPools.get();
    }
+
+   public static SimpleString cachedAddressSimpleString(String address, CoreMessageObjectPools coreMessageObjectPools) {
+      return SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 2775f77..27d571e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -67,6 +67,8 @@ import io.netty.buffer.Unpooled;
 // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
+   public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
+
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
@@ -103,19 +105,20 @@ public class AMQPMessage extends RefCountMessage {
     *  these are properties created by the broker only */
    private volatile TypedProperties extraProperties;
 
-   public AMQPMessage(long messageFormat, byte[] data) {
-      this(messageFormat, data, null);
+   public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties) {
+      this(messageFormat, data, extraProperties, null);
    }
 
-   public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
-      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
+   public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
+      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), extraProperties, coreMessageObjectPools);
    }
 
-   public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
+   public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
       this.data = data;
       this.messageFormat = messageFormat;
       this.bufferValid = true;
       this.coreMessageObjectPools = coreMessageObjectPools;
+      this.extraProperties = extraProperties == null ? null : new TypedProperties(extraProperties);
       parseHeaders();
    }
 
@@ -496,7 +499,7 @@ public class AMQPMessage extends RefCountMessage {
       view.position(messagePaylodStart);
       view.get(newData, headerEnds, view.remaining());
 
-      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
       newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
       return newEncode;
    }
@@ -604,26 +607,40 @@ public class AMQPMessage extends RefCountMessage {
       return addressSimpleString == null ? null : addressSimpleString.toString();
    }
 
+
+   public SimpleString cachedAddressSimpleString(String address) {
+      return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
+   }
+
    @Override
    public AMQPMessage setAddress(String address) {
-      this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
+      setAddress(cachedAddressSimpleString(address));
       return this;
    }
 
    @Override
    public AMQPMessage setAddress(SimpleString address) {
       this.address = address;
+      createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address);
       return this;
    }
 
    @Override
    public SimpleString getAddressSimpleString() {
       if (address == null) {
-         Properties properties = getProtonMessage().getProperties();
-         if (properties != null) {
-            setAddress(properties.getTo());
-         } else {
-            return null;
+         TypedProperties extraProperties = getExtraProperties();
+
+         // we first check if extraProperties is not null, no need to create it just to check it here
+         if (extraProperties != null) {
+            address = extraProperties.getSimpleStringProperty(ADDRESS_PROPERTY);
+         }
+
+         if (address == null) {
+            // if it still null, it will look for the address on the getTo();
+            Properties properties = getProperties();
+            if (properties != null && properties.getTo() != null) {
+               address = cachedAddressSimpleString(properties.getTo());
+            }
          }
       }
       return address;
@@ -1261,6 +1278,9 @@ public class AMQPMessage extends RefCountMessage {
          ", messageID=" + getMessageID() +
          ", address=" + getAddress() +
          ", size=" + getEncodeSize() +
+         ", applicationProperties=" + getApplicationProperties() +
+         ", properties=" + getProperties() +
+         ", extraProperties = " + getExtraProperties() +
          "]";
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index aac3f2a..6461bb2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -460,7 +460,7 @@ public class AMQPSessionCallback implements SessionCallback {
                           SimpleString address,
                           int messageFormat,
                           ReadableBuffer data) throws Exception {
-      AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
+      AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
       if (address != null) {
          message.setAddress(address);
       } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 80969f6..d070579 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -179,6 +179,9 @@ public class AmqpCoreConverter {
       TypedProperties properties = message.getExtraProperties();
       if (properties != null) {
          for (SimpleString str : properties.getPropertyNames()) {
+            if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
+               continue;
+            }
             result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index abda58a..66c75a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -316,7 +316,7 @@ public class CoreAmqpConverter {
          byte[] data = new byte[buffer.writerIndex()];
          buffer.readBytes(data);
 
-         AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data);
+         AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data, null);
          amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
          amqpMessage.setReplyTo(coreMessage.getReplyTo());
          return amqpMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index bff43a8..42ffaee 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -291,7 +291,7 @@ public class AMQPMessageTest {
       byte[] bytes = new byte[nettyBuffer.writerIndex()];
       nettyBuffer.readBytes(bytes);
 
-      return new AMQPMessage(0, bytes);
+      return new AMQPMessage(0, bytes, null);
    }
 
    private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
@@ -302,6 +302,6 @@ public class AMQPMessageTest {
       byte[] bytes = new byte[nettyBuffer.writerIndex()];
       nettyBuffer.readBytes(bytes);
 
-      return new AMQPMessage(0, bytes);
+      return new AMQPMessage(0, bytes, null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b722532..0e21da2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -977,9 +977,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          // arrived the target node
          // as described on https://issues.jboss.org/browse/JBPAPP-6130
          Message copyRedistribute = message.copy(storageManager.generateID());
-         if (copyRedistribute.getAddress() == null) {
-            copyRedistribute.setAddress(originatingQueue.getAddress());
-         }
+         copyRedistribute.setAddress(originatingQueue.getAddress());
 
          if (tx != null) {
             tx.addOperation(new TransactionOperationAbstract() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
new file mode 100644
index 0000000..f1d0906
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
+
+   private static final int NUMBER_OF_SERVERS = 2;
+   private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
+
+
+   // I'm taking any number that /2 = Odd
+   // to avoid perfect roundings and making sure messages are evenly distributed
+   private static final int NUMBER_OF_MESSAGES = 77 * 2;
+
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection getParameters() {
+      return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}});
+   }
+
+   @Parameterized.Parameter(0)
+   public String protocol;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+   }
+
+   private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception {
+      setupServers();
+
+      setRedistributionDelay(0);
+
+      setupCluster(loadBalancingType);
+
+      AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
+
+      getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+
+      startServers(0);
+      startServers(1);
+
+      createQueue(SimpleString.toSimpleString("queues.expiry"));
+      createQueue(queueName);
+   }
+
+   private void createQueue(SimpleString queueName) throws Exception {
+      servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
+      servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   private ConnectionFactory getJmsConnectionFactory(int node) {
+      if (protocol.equals("AMQP")) {
+         return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
+      } else {
+         return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
+      }
+   }
+
+   private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
+      for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) {
+         for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values()) {
+            record.getBridge().pause();
+         }
+      }
+   }
+
+   @Test
+   public void testLoadBalancing() throws Exception {
+
+      startServers(MessageLoadBalancingType.STRICT);
+
+      ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
+      Connection[] connection = new Connection[NUMBER_OF_SERVERS];
+      Session[]  session = new Session[NUMBER_OF_SERVERS];
+      MessageConsumer[]  consumer = new MessageConsumer[NUMBER_OF_SERVERS];
+
+      // this will pre create consumers to make sure messages are distributed evenly without redistribution
+      for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+         factory[node] = getJmsConnectionFactory(node);
+         connection[node] = factory[node].createConnection();
+         session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
+      }
+
+      waitForBindings(0, "queues.0", 1, 1, true);
+      waitForBindings(1, "queues.0", 1, 1, true);
+
+      waitForBindings(0, "queues.0", 1, 1, false);
+      waitForBindings(1, "queues.0", 1, 1, false);
+
+      pauseClusteringBridges(servers[0]);
+
+
+      // sending Messages.. they should be load balanced
+      {
+         ConnectionFactory cf =  getJmsConnectionFactory(0);
+         Connection cn = cf.createConnection();
+         Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            pd.send(sn.createTextMessage("hello " + i));
+         }
+
+         cn.close();
+      }
+
+      receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
+      connection[1].start();
+      Assert.assertNull(consumer[1].receiveNoWait());
+      connection[1].stop();
+
+      servers[0].stop();
+      clearServer(0);
+
+      setupServer(0, isFileStorage(), isNetty());
+      servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1);
+
+      servers[0].start();
+
+      receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
+      for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
+         connection[node].close();
+      }
+
+   }
+
+   @Test
+   public void testExpireRedistributed() throws Exception {
+
+      startServers(MessageLoadBalancingType.ON_DEMAND);
+
+      ConnectionFactory factory = getJmsConnectionFactory(1);
+
+
+      waitForBindings(0, "queues.0", 1, 0, true);
+      waitForBindings(1, "queues.0", 1, 0, true);
+
+      waitForBindings(0, "queues.0", 1, 0, false);
+      waitForBindings(1, "queues.0", 1, 0, false);
+
+
+      // sending Messages..
+      {
+         ConnectionFactory cf =  getJmsConnectionFactory(0);
+         Connection cn = cf.createConnection();
+         Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
+         pd.setTimeToLive(200);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            pd.send(sn.createTextMessage("hello " + i));
+         }
+
+         cn.close();
+      }
+
+      // time to let stuff expire
+      Thread.sleep(200);
+
+
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
+
+      receiveMessages(connection, consumer, NUMBER_OF_MESSAGES, true);
+      connection.close();
+   }
+
+   private void receiveMessages(Connection connection,
+                                MessageConsumer messageConsumer,
+                                int messageCount,
+                                boolean exactCount) throws JMSException {
+      connection.start();
+
+      for (int i = 0; i < messageCount; i++) {
+         Message msg = messageConsumer.receive(5000);
+         Assert.assertNotNull(msg);
+      }
+
+      // this means no more messages received
+      if (exactCount) {
+         Assert.assertNull(messageConsumer.receiveNoWait());
+      }
+   }
+
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
+   }
+
+   protected void setRedistributionDelay(final long delay) {
+   }
+
+   protected void setupServers() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+      servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+   }
+
+   protected void stopServers() throws Exception {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      closeAllServerLocatorsFactories();
+
+      stopServers(0, 1);
+
+      clearServer(0, 1);
+   }
+
+   /**
+    * @param serverID
+    * @return
+    * @throws Exception
+    */
+   @Override
+   protected ConfigurationImpl createBasicConfig(final int serverID) {
+      ConfigurationImpl configuration = super.createBasicConfig(serverID);
+      configuration.setMessageExpiryScanPeriod(100);
+
+      return configuration;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ae2784d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
new file mode 100644
index 0000000..e9b7c01
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains tests about messages crossing over protocols
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
\ No newline at end of file


[2/2] activemq-artemis git commit: This closes #2115

Posted by ma...@apache.org.
This closes #2115


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/545b82fb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/545b82fb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/545b82fb

Branch: refs/heads/master
Commit: 545b82fbd73598a640c5a33dc885f841da2087c1
Parents: 9b7ebef 1ae2784
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu May 31 10:48:12 2018 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu May 31 10:48:12 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  18 ++
 .../message/impl/CoreMessageObjectPools.java    |   4 +
 .../protocol/amqp/broker/AMQPMessage.java       |  44 ++-
 .../amqp/broker/AMQPSessionCallback.java        |   2 +-
 .../amqp/converter/AmqpCoreConverter.java       |   3 +
 .../amqp/converter/CoreAmqpConverter.java       |   2 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |   4 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +-
 .../ProtocolsMessageLoadBalancingTest.java      | 284 +++++++++++++++++++
 .../cluster/crossprotocol/package-info.java     |  20 ++
 10 files changed, 366 insertions(+), 19 deletions(-)
----------------------------------------------------------------------