You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/20 23:01:37 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1052 adding a test for
expiry and AMQP
Repository: activemq-artemis
Updated Branches:
refs/heads/master 9385ce487 -> 150f67f86
ARTEMIS-1052 adding a test for expiry and AMQP
This closes #1106
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7ac27df7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7ac27df7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7ac27df7
Branch: refs/heads/master
Commit: 7ac27df7a0208714d70b920c1be9635bf0097eec
Parents: 9385ce4
Author: Jiri Danek <jd...@redhat.com>
Authored: Mon Mar 20 10:38:05 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 20 18:05:45 2017 -0400
----------------------------------------------------------------------
.../amqp/SendingAndReceivingTest.java | 43 ++++++++++++++++++++
1 file changed, 43 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7ac27df7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index 46e3b95..3c4e915 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -26,8 +27,11 @@ import javax.jms.TextMessage;
import java.util.Random;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@@ -91,6 +95,45 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
}
}
+ @Test(timeout = 60000)
+ public void testSendMessageThenAllowToExpireUsingTimeToLive() throws Exception {
+ AddressSettings as = new AddressSettings();
+ as.setExpiryAddress(SimpleString.toSimpleString("DLQ"));
+ HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
+ repos.addMatch("exampleQueue", as);
+
+ Connection connection = null;
+ ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
+
+ try {
+ connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String address = "exampleQueue";
+ Queue queue = session.createQueue(address);
+
+ MessageProducer sender = session.createProducer(queue);
+ sender.setTimeToLive(10);
+
+ Message message = session.createMessage();
+ sender.send(message);
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message m = consumer.receive(5000);
+ Assert.assertNull(m);
+ consumer.close();
+
+ consumer = session.createConsumer(session.createQueue("DLQ"));
+ m = consumer.receive(5000);
+ Assert.assertNotNull(m);
+ consumer.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
private static String createMessage(int messageSize) {
final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
Random rnd = new Random();
[2/3] activemq-artemis git commit: ARTEMIS-1052 Proper Expiry over
AMQP
Posted by ta...@apache.org.
ARTEMIS-1052 Proper Expiry over AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/65ac7f70
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/65ac7f70
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/65ac7f70
Branch: refs/heads/master
Commit: 65ac7f700ba0a9409ecd345b77a991c5aedd53b9
Parents: 7ac27df
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Mar 20 12:24:42 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 20 18:33:31 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 44 +++++--
.../artemis/core/message/impl/CoreMessage.java | 4 +-
.../protocol/amqp/broker/AMQPMessage.java | 105 +++++++++++----
.../amqp/proton/ProtonServerSenderContext.java | 2 +-
.../core/protocol/openwire/OpenwireMessage.java | 4 +-
.../impl/journal/LargeServerMessageImpl.java | 2 +-
.../core/postoffice/impl/BindingsImpl.java | 4 +-
.../core/postoffice/impl/PostOfficeImpl.java | 4 +-
.../artemis/core/server/impl/DivertImpl.java | 2 +
.../artemis/core/server/impl/QueueImpl.java | 18 ++-
.../impl/ScheduledDeliveryHandlerTest.java | 4 +-
.../transport/amqp/client/AmqpMessage.java | 2 +-
.../integration/amqp/AmqpClientTestSupport.java | 88 +------------
.../integration/amqp/AmqpNettyFailoverTest.java | 1 -
.../tests/integration/amqp/AmqpTestSupport.java | 127 +++++++++++++++++++
.../amqp/SendingAndReceivingTest.java | 94 ++++++++++++--
.../integration/client/AcknowledgeTest.java | 4 +-
17 files changed, 359 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index ec0a2db..56097ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -364,23 +364,30 @@ public interface Message {
}
setBuffer(null);
}
+
+ default void reencode() {
+ // only valid probably on AMQP
+ }
+
default void referenceOriginalMessage(final Message original, String originalQueue) {
- String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
+ String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
if (queueOnMessage != null) {
- putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+ setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
} else if (originalQueue != null) {
- putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+ setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
}
- if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
- putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
+ Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
+
+ if (originalID != null) {
+ setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
- putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+ setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
} else {
- putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
+ setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
- putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+ setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
}
// reset expiry
@@ -503,9 +510,26 @@ public interface Message {
Object getObjectProperty(SimpleString key);
- Object removeDeliveryAnnotationProperty(SimpleString key);
+ default Object removeAnnotation(SimpleString key) {
+ return removeProperty(key);
+ }
- Object getDeliveryAnnotationProperty(SimpleString key);
+ default String getAnnotationString(SimpleString key) {
+ Object value = getAnnotation(key);
+ if (value != null) {
+ return value.toString();
+ } else {
+ return null;
+ }
+ }
+
+ Object getAnnotation(SimpleString key);
+
+ /** Callers must call {@link #reencode()} in order to be sent to clients */
+ default Message setAnnotation(SimpleString key, Object value) {
+ putObjectProperty(key, value);
+ return this;
+ }
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index ce1ea96..f0a8715 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -98,13 +98,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
/** On core there's no delivery annotation */
@Override
- public Object getDeliveryAnnotationProperty(SimpleString key) {
+ public Object getAnnotation(SimpleString key) {
return getObjectProperty(key);
}
/** On core there's no delivery annotation */
@Override
- public Object removeDeliveryAnnotationProperty(SimpleString key) {
+ public Object removeAnnotation(SimpleString key) {
return removeProperty(key);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index c1c676c..e01d430 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -62,14 +63,14 @@ public class AMQPMessage extends RefCountMessage {
final long messageFormat;
ByteBuf data;
boolean bufferValid;
- byte type;
+ boolean durable;
long messageID;
String address;
MessageImpl protonMessage;
private volatile int memoryEstimate = -1;
private long expiration = 0;
// this is to store where to start sending bytes, ignoring header and delivery annotations.
- private int sendFrom = -1;
+ private int sendFrom = 0;
private boolean parsedHeaders = false;
private Header _header;
private DeliveryAnnotations _deliveryAnnotations;
@@ -123,7 +124,7 @@ public class AMQPMessage extends RefCountMessage {
private void initalizeObjects() {
if (protonMessage == null) {
if (data == null) {
- this.sendFrom = -1;
+ this.sendFrom = 0;
_header = new Header();
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
_properties = new Properties();
@@ -220,12 +221,27 @@ public class AMQPMessage extends RefCountMessage {
return null;
}
+ private Object removeSymbol(Symbol symbol) {
+ MessageAnnotations annotations = getMessageAnnotations();
+ Map mapAnnotations = annotations != null ? annotations.getValue() : null;
+ if (mapAnnotations != null) {
+ return mapAnnotations.remove(symbol);
+ }
+
+ return null;
+ }
+
+
private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value);
}
private void setSymbol(Symbol symbol, Object value) {
MessageAnnotations annotations = getMessageAnnotations();
+ if (annotations == null) {
+ _messageAnnotations = new MessageAnnotations(new HashMap<>());
+ annotations = _messageAnnotations;
+ }
Map mapAnnotations = annotations != null ? annotations.getValue() : null;
if (mapAnnotations != null) {
mapAnnotations.put(symbol, value);
@@ -408,7 +424,14 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message copy() {
checkBuffer();
- AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array());
+
+ byte[] origin = data.array();
+ byte[] newData = new byte[data.array().length - sendFrom];
+ for (int i = 0; i < newData.length; i++) {
+ newData[i] = origin[i + sendFrom];
+ }
+ AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
+ newEncode.setDurable(isDurable());
return newEncode;
}
@@ -436,6 +459,16 @@ public class AMQPMessage extends RefCountMessage {
@Override
public AMQPMessage setExpiration(long expiration) {
+
+ Properties properties = getProperties();
+
+ if (properties != null) {
+ if (expiration <= 0) {
+ properties.setAbsoluteExpiryTime(null);
+ } else {
+ properties.setAbsoluteExpiryTime(new Date(expiration));
+ }
+ }
this.expiration = expiration;
return this;
}
@@ -460,7 +493,7 @@ public class AMQPMessage extends RefCountMessage {
if (getHeader() != null && getHeader().getDurable() != null) {
return getHeader().getDurable().booleanValue();
} else {
- return false;
+ return durable;
}
}
@@ -471,7 +504,8 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
- return null;
+ this.durable = durable;
+ return this;
}
@Override
@@ -544,11 +578,19 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
+ public int getEncodeSize() {
+ checkBuffer();
+ // + 20checkBuffer is an estimate for the Header with the deliveryCount
+ return data.array().length - sendFrom + 20;
+ }
+
+ @Override
public void sendBuffer(ByteBuf buffer, int deliveryCount) {
checkBuffer();
Header header = getHeader();
if (header == null && deliveryCount > 0) {
header = new Header();
+ header.setDurable(durable);
}
if (header != null) {
synchronized (header) {
@@ -756,20 +798,37 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public Object removeDeliveryAnnotationProperty(SimpleString key) {
- parseHeaders();
- if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
- return null;
- }
- return _deliveryAnnotations.getValue().remove(key.toString());
+ public Object removeAnnotation(SimpleString key) {
+ return removeSymbol(Symbol.getSymbol(key.toString()));
}
@Override
- public Object getDeliveryAnnotationProperty(SimpleString key) {
- return null;
+ public Object getAnnotation(SimpleString key) {
+ return getSymbol(key.toString());
}
@Override
+ public AMQPMessage setAnnotation(SimpleString key, Object value) {
+ setSymbol(key.toString(), value);
+ return this;
+ }
+
+
+ @Override
+ public void reencode() {
+ parseHeaders();
+ ApplicationProperties properties = getApplicationProperties();
+ getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
+ getProtonMessage().setMessageAnnotations(_messageAnnotations);
+ getProtonMessage().setApplicationProperties(properties);
+ getProtonMessage().setProperties(this._properties);
+ bufferValid = false;
+ checkBuffer();
+ }
+
+
+
+ @Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
}
@@ -850,11 +909,6 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public int getEncodeSize() {
- return 0;
- }
-
- @Override
public Set<SimpleString> getPropertyNames() {
HashSet<SimpleString> values = new HashSet<>();
for (Object k : getApplicationPropertiesMap().keySet()) {
@@ -901,15 +955,18 @@ public class AMQPMessage extends RefCountMessage {
@Override
public int getPersistSize() {
- checkBuffer();
- return data.array().length + DataConstants.SIZE_INT;
+ return DataConstants.SIZE_INT + internalPersistSize();
+ }
+
+ private int internalPersistSize() {
+ return data.array().length - sendFrom;
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
checkBuffer();
- targetRecord.writeInt(data.array().length);
- targetRecord.writeBytes(data.array());
+ targetRecord.writeInt(internalPersistSize());
+ targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom);
}
@Override
@@ -917,8 +974,10 @@ public class AMQPMessage extends RefCountMessage {
int size = record.readInt();
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
+ this.sendFrom = 0; // whatever was persisted will be sent
this.data = Unpooled.wrappedBuffer(recordArray);
this.bufferValid = true;
+ this.durable = true; // it's coming from the journal, so it's durable
parseHeaders();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 5a97c02..d24464c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -594,7 +594,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
- ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+ ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
try {
message.sendBuffer(nettyBuffer, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 9fb6eb9..3bd95f4 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -58,12 +58,12 @@ public class OpenwireMessage implements Message {
}
@Override
- public Object removeDeliveryAnnotationProperty(SimpleString key) {
+ public Object removeAnnotation(SimpleString key) {
return null;
}
@Override
- public Object getDeliveryAnnotationProperty(SimpleString key) {
+ public Object getAnnotation(SimpleString key) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 46bd335..2c3e01c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -242,7 +242,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original;
this.paged = otherLM.paged;
if (this.paged) {
- this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+ this.removeAnnotation(Message.HDR_ORIG_MESSAGE_ID);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index c7b6024..377223b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
- byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS);
+ byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids);
@@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
if (!routed) {
// Remove the ids now, in order to avoid double check
- ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS);
+ ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking
SimpleString groupId = message.getGroupID();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1f51210..2ef7657 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -765,6 +765,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
+ message.reencode();
+
route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
@@ -1221,7 +1223,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first
- Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
+ Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 5b0d406..c73fd80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -104,6 +104,8 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
+ copy.reencode();
+
switch (routingType) {
case ANYCAST:
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index fc655f6..406ba5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1718,14 +1718,14 @@ public class QueueImpl implements Queue {
@Override
public int retryMessages(Filter filter) throws Exception {
- final HashMap<SimpleString, Long> queues = new HashMap<>();
+ final HashMap<String, Long> queues = new HashMap<>();
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
- SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
- SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
+ String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
+ String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
if (originalMessageAddress != null) {
@@ -1735,7 +1735,7 @@ public class QueueImpl implements Queue {
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
targetQueue = queues.get(originalMessageQueue);
if (targetQueue == null) {
- Binding binding = postOffice.getBinding(originalMessageQueue);
+ Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
if (binding != null && binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding) binding).getID();
@@ -1745,9 +1745,9 @@ public class QueueImpl implements Queue {
}
if (targetQueue != null) {
- move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue());
+ move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
} else {
- move(originalMessageAddress, tx, ref, false, false);
+ move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
}
@@ -2495,10 +2495,14 @@ public class QueueImpl implements Queue {
copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null);
}
+ copy.setExpiration(0);
+
if (expiry) {
- copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis());
+ copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
}
+ copy.reencode();
+
return copy;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 8d06286..dd48b58 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -299,12 +299,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public Object removeDeliveryAnnotationProperty(SimpleString key) {
+ public Object removeAnnotation(SimpleString key) {
return null;
}
@Override
- public Object getDeliveryAnnotationProperty(SimpleString key) {
+ public Object getAnnotation(SimpleString key) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 8599fa9..bf9e0b5 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -471,7 +471,7 @@ public class AmqpMessage {
/**
* Sets the creation time property on the message.
*
- * @param absoluteExpiryTime the expiration time value to set.
+ * @param creationTime the time value to set.
*/
public void setCreationTime(long creationTime) {
checkReadOnly();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index d9b45d3..99ce4db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -17,8 +17,6 @@
package org.apache.activemq.artemis.tests.integration.amqp;
-import java.net.URI;
-import java.util.LinkedList;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -32,7 +30,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -46,23 +43,14 @@ import org.junit.Before;
* Test support class for tests that will be using the AMQP Proton wrapper client.
* This is to make it easier to migrate tests from ActiveMQ5
*/
-public class AmqpClientTestSupport extends ActiveMQTestBase {
+public class AmqpClientTestSupport extends AmqpTestSupport {
protected static Symbol SHARED = Symbol.getSymbol("shared");
protected static Symbol GLOBAL = Symbol.getSymbol("global");
- private boolean useSSL;
-
protected JMSServerManager serverManager;
protected ActiveMQServer server;
- protected LinkedList<AmqpConnection> connections = new LinkedList<>();
-
- protected AmqpConnection addConnection(AmqpConnection connection) {
- connections.add(connection);
- return connection;
- }
-
@Before
@Override
public void setUp() throws Exception {
@@ -80,6 +68,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ignored.printStackTrace();
}
}
+ connections.clear();
if (serverManager != null) {
try {
@@ -149,79 +138,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
this.useSSL = useSSL;
}
- public boolean isUseSSL() {
- return useSSL;
- }
-
- public String getAmqpConnectionURIOptions() {
- return "";
- }
-
- public URI getBrokerAmqpConnectionURI() {
- boolean webSocket = false;
-
- try {
- int port = 61616;
-
- String uri = null;
-
- if (isUseSSL()) {
- if (webSocket) {
- uri = "wss://127.0.0.1:" + port;
- } else {
- uri = "ssl://127.0.0.1:" + port;
- }
- } else {
- if (webSocket) {
- uri = "ws://127.0.0.1:" + port;
- } else {
- uri = "tcp://127.0.0.1:" + port;
- }
- }
-
- if (!getAmqpConnectionURIOptions().isEmpty()) {
- uri = uri + "?" + getAmqpConnectionURIOptions();
- }
-
- return new URI(uri);
- } catch (Exception e) {
- throw new RuntimeException();
- }
- }
-
- public AmqpConnection createAmqpConnection() throws Exception {
- return createAmqpConnection(getBrokerAmqpConnectionURI());
- }
-
- public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
- return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
- }
-
- public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
- return createAmqpConnection(brokerURI, null, null);
- }
-
- public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
- return createAmqpClient(brokerURI, username, password).connect();
- }
-
- public AmqpClient createAmqpClient() throws Exception {
- return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
- }
-
- public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
- return createAmqpClient(brokerURI, null, null);
- }
-
- public AmqpClient createAmqpClient(String username, String password) throws Exception {
- return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
- }
-
- public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
- return new AmqpClient(brokerURI, username, password);
- }
-
-
protected void sendMessages(int numMessages, String address) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
index 5496883..5fb4e35 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
new file mode 100644
index 0000000..8f89452
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.junit.After;
+
+/** This will only add methods to support AMQP Testing without creating servers or anything */
+public class AmqpTestSupport extends ActiveMQTestBase {
+ protected LinkedList<AmqpConnection> connections = new LinkedList<>();
+
+ protected boolean useSSL;
+
+ protected AmqpConnection addConnection(AmqpConnection connection) {
+ connections.add(connection);
+ return connection;
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ for (AmqpConnection conn : connections) {
+ try {
+ conn.close();
+ } catch (Throwable ignored) {
+ ignored.printStackTrace();
+ }
+ }
+
+ super.tearDown();
+ }
+
+ public boolean isUseSSL() {
+ return useSSL;
+ }
+
+ public String getAmqpConnectionURIOptions() {
+ return "";
+ }
+
+ public URI getBrokerAmqpConnectionURI() {
+ boolean webSocket = false;
+
+ try {
+ int port = 61616;
+
+ String uri = null;
+
+ if (isUseSSL()) {
+ if (webSocket) {
+ uri = "wss://127.0.0.1:" + port;
+ } else {
+ uri = "ssl://127.0.0.1:" + port;
+ }
+ } else {
+ if (webSocket) {
+ uri = "ws://127.0.0.1:" + port;
+ } else {
+ uri = "tcp://127.0.0.1:" + port;
+ }
+ }
+
+ if (!getAmqpConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getAmqpConnectionURIOptions();
+ }
+
+ return new URI(uri);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ public AmqpConnection createAmqpConnection() throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI());
+ }
+
+ public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+ return createAmqpConnection(brokerURI, null, null);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+ return createAmqpClient(brokerURI, username, password).connect();
+ }
+
+ public AmqpClient createAmqpClient() throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+ return createAmqpClient(brokerURI, null, null);
+ }
+
+ public AmqpClient createAmqpClient(String username, String password) throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
+ return new AmqpClient(brokerURI, username, password);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index 3c4e915..87a4710 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -26,20 +26,28 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class SendingAndReceivingTest extends ActiveMQTestBase {
+public class SendingAndReceivingTest extends AmqpTestSupport {
private ActiveMQServer server;
@@ -55,7 +63,15 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
tc.getExtraParams().put("multicastPrefix", "multicast://");
}
}
+ server.getConfiguration().setMessageExpiryScanPeriod(1);
server.start();
+ server.createQueue(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("exampleQueue"), null, true, false, -1, false, true);
+ server.createQueue(SimpleString.toSimpleString("DLQ"), RoutingType.ANYCAST, SimpleString.toSimpleString("DLQ"), null, true, false, -1, false, true);
+
+ AddressSettings as = new AddressSettings();
+ as.setExpiryAddress(SimpleString.toSimpleString("DLQ"));
+ HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
+ repos.addMatch("#", as);
}
@After
@@ -112,21 +128,24 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
Queue queue = session.createQueue(address);
MessageProducer sender = session.createProducer(queue);
- sender.setTimeToLive(10);
+ sender.setTimeToLive(1);
Message message = session.createMessage();
sender.send(message);
connection.start();
- MessageConsumer consumer = session.createConsumer(queue);
- Message m = consumer.receive(5000);
- Assert.assertNull(m);
+ MessageConsumer consumer = session.createConsumer(session.createQueue("DLQ"));
+ Message m = consumer.receive(10000);
+ Assert.assertNotNull(m);
consumer.close();
- consumer = session.createConsumer(session.createQueue("DLQ"));
- m = consumer.receive(5000);
- Assert.assertNotNull(m);
+
+ consumer = session.createConsumer(queue);
+ m = consumer.receiveNoWait();
+ Assert.assertNull(m);
consumer.close();
+
+
} finally {
if (connection != null) {
connection.close();
@@ -134,6 +153,63 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
}
}
+ @Test(timeout = 60000)
+ public void testSendExpiry() throws Throwable {
+ internalSendExpiry(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendExpiryRestartServer() throws Throwable {
+ internalSendExpiry(true);
+ }
+
+ public void internalSendExpiry(boolean restartServer) throws Throwable {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ try {
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender("exampleQueue");
+
+ AmqpMessage message = new AmqpMessage();
+ message.setDurable(true);
+ message.setText("Test-Message");
+ message.setDeliveryAnnotation("shouldDisappear", 1);
+ message.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000);
+ sender.send(message);
+
+ org.apache.activemq.artemis.core.server.Queue dlq = server.locateQueue(SimpleString.toSimpleString("DLQ"));
+
+ Wait.waitFor(() -> dlq.getMessageCount() > 0, 5000, 500);
+
+ connection.close();
+
+ if (restartServer) {
+ server.stop();
+ server.start();
+ }
+
+ connection = client.connect();
+ session = connection.createSession();
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver("DLQ");
+ receiver.flow(20);
+ message = receiver.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("exampleQueue", message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
+ Assert.assertNull(message.getDeliveryAnnotation("shouldDisappear"));
+ Assert.assertNull(receiver.receiveNoWait());
+
+ } finally {
+ connection.close();
+ }
+ }
+
+
+
private static String createMessage(int messageSize) {
final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
Random rnd = new Random();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 43dad84..31e26e3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -357,12 +357,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public Object removeDeliveryAnnotationProperty(SimpleString key) {
+ public Object removeAnnotation(SimpleString key) {
return null;
}
@Override
- public Object getDeliveryAnnotationProperty(SimpleString key) {
+ public Object getAnnotation(SimpleString key) {
return null;
}
[3/3] activemq-artemis git commit: This closes #1110
Posted by ta...@apache.org.
This closes #1110
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/150f67f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/150f67f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/150f67f8
Branch: refs/heads/master
Commit: 150f67f8622cb8b67e7b68e327db387178f6ad70
Parents: 9385ce4 65ac7f7
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Mar 20 19:00:59 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Mar 20 19:00:59 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 44 +++++--
.../artemis/core/message/impl/CoreMessage.java | 4 +-
.../protocol/amqp/broker/AMQPMessage.java | 105 +++++++++++----
.../amqp/proton/ProtonServerSenderContext.java | 2 +-
.../core/protocol/openwire/OpenwireMessage.java | 4 +-
.../impl/journal/LargeServerMessageImpl.java | 2 +-
.../core/postoffice/impl/BindingsImpl.java | 4 +-
.../core/postoffice/impl/PostOfficeImpl.java | 4 +-
.../artemis/core/server/impl/DivertImpl.java | 2 +
.../artemis/core/server/impl/QueueImpl.java | 18 ++-
.../impl/ScheduledDeliveryHandlerTest.java | 4 +-
.../transport/amqp/client/AmqpMessage.java | 2 +-
.../integration/amqp/AmqpClientTestSupport.java | 88 +------------
.../integration/amqp/AmqpNettyFailoverTest.java | 1 -
.../tests/integration/amqp/AmqpTestSupport.java | 127 +++++++++++++++++++
.../amqp/SendingAndReceivingTest.java | 123 +++++++++++++++++-
.../integration/client/AcknowledgeTest.java | 4 +-
17 files changed, 395 insertions(+), 143 deletions(-)
----------------------------------------------------------------------