You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:26 UTC
[33/36] activemq-artemis git commit: Fixing Scheduled Message
Fixing Scheduled Message
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef95eaa8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef95eaa8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef95eaa8
Branch: refs/heads/artemis-1009
Commit: ef95eaa848bb8d73271d60c2030be336c3862713
Parents: 4246128
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 1 17:43:51 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 3 +
.../artemis/core/message/impl/CoreMessage.java | 11 ++++
.../protocol/amqp/broker/AMQPMessage.java | 69 ++++++++++++--------
.../core/postoffice/impl/PostOfficeImpl.java | 18 +++--
.../management/impl/ManagementServiceImpl.java | 3 +-
.../impl/ScheduledDeliveryHandlerTest.java | 9 ++-
.../tests/integration/amqp/ProtonTest.java | 49 +++++++-------
.../integration/client/AcknowledgeTest.java | 5 ++
8 files changed, 99 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index d96f232..b08202d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -169,6 +169,9 @@ public interface Message {
void messageChanged();
+ /** Used to calculate what is the delivery time.
+ * Return null if not scheduled. */
+ Long getScheduledDeliveryTime();
/** Used for Large messages on Core.
* Do not use this, it will go away
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 513b758..f620a1d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -153,6 +153,17 @@ public class CoreMessage extends RefCountMessage {
}
}
+ @Override
+ public Long getScheduledDeliveryTime() {
+ Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (property != null && property instanceof Number) {
+ return ((Number) property).longValue();
+ }
+
+ return null;
+ }
+
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d39bf9d..772f2cd 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -63,6 +64,7 @@ public class AMQPMessage extends RefCountMessage {
private long expiration = 0;
// this can be used to encode the header again and the rest of the message buffer
private int headerEnd = -1;
+ private boolean parsedHeaders = false;
private Header _header;
private DeliveryAnnotations _deliveryAnnotations;
private MessageAnnotations _messageAnnotations;
@@ -142,39 +144,55 @@ public class AMQPMessage extends RefCountMessage {
}
private ApplicationProperties getApplicationProperties() {
- if (applicationProperties == null) {
- if (data != null) {
- partialDecode(data.nioBuffer(), true);
- } else {
- initalizeObjects();
- }
- }
-
+ parseHeaders();
return applicationProperties;
}
- public Header getHeader() {
- if (_header == null) {
+ private void parseHeaders() {
+ if (!parsedHeaders) {
if (data == null) {
initalizeObjects();
} else {
- partialDecode(this.data.nioBuffer(), false);
+ partialDecode(data.nioBuffer());
}
+ parsedHeaders = true;
}
+ }
+ public MessageAnnotations getMessageAnnotations() {
+ parseHeaders();
+ return _messageAnnotations;
+ }
+
+ public Header getHeader() {
+ parseHeaders();
return _header;
}
public Properties getProperties() {
- if (_properties == null) {
- if (data == null) {
- initalizeObjects();
- } else {
- partialDecode(this.data.nioBuffer(), true);
- }
+ parseHeaders();
+ return _properties;
+ }
+
+ private Object getSymbol(String symbol) {
+ MessageAnnotations annotations = getMessageAnnotations();
+ Map mapAnnotations = annotations != null ? annotations.getValue() : null;
+ if (mapAnnotations != null) {
+ return mapAnnotations.get(Symbol.getSymbol("x-opt-delivery-time"));
}
- return _properties;
+ return null;
+ }
+
+ @Override
+ public Long getScheduledDeliveryTime() {
+
+ Object scheduledTime = getSymbol("x-opt-delivery-time");
+ if (scheduledTime != null && scheduledTime instanceof Number) {
+ return ((Number)scheduledTime).longValue();
+ }
+
+ return null;
}
@Override
@@ -182,7 +200,7 @@ public class AMQPMessage extends RefCountMessage {
return AMQPMessagePersister.getInstance();
}
- private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) {
+ private synchronized void partialDecode(ByteBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
buffer.position(0);
@@ -207,11 +225,7 @@ public class AMQPMessage extends RefCountMessage {
this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
}
- if (!readApplicationProperties) {
- return;
- }
-
- if (buffer.hasRemaining() && readApplicationProperties) {
+ if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -220,10 +234,6 @@ public class AMQPMessage extends RefCountMessage {
// meaning there is no header
headerEnd = 0;
}
-
- if (!readApplicationProperties) {
- return;
- }
if (section instanceof DeliveryAnnotations) {
_deliveryAnnotations = (DeliveryAnnotations) section;
@@ -254,6 +264,7 @@ public class AMQPMessage extends RefCountMessage {
}
}
+
if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section;
}
@@ -785,7 +796,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message toCore() {
MessageImpl protonMessage = getProtonMessage();
- return null;
+ throw new IllegalStateException("conversion between AMQP and Core not implemented yet!");
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 81a83ac..d23185e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1079,6 +1079,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Transaction tx = context.getTransaction();
+ Long deliveryTime = message.getScheduledDeliveryTime();
+
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());
@@ -1095,12 +1097,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (Queue queue : entry.getValue().getNonDurableQueues()) {
MessageReference reference = MessageReference.Factory.createReference(message, queue);
- refs.add(reference);
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if (deliveryTime != null) {
+ reference.setScheduledDeliveryTime(deliveryTime);
}
+ refs.add(reference);
message.incrementRefCount();
}
@@ -1119,13 +1119,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if (deliveryTime != null) {
+ reference.setScheduledDeliveryTime(deliveryTime);
}
+ refs.add(reference);
if (message.isDurable()) {
int durableRefCount = message.incrementDurableRefCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 002b2c7..5b2bf28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -365,7 +365,8 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
- public Message handleMessage(final Message message) throws Exception {
+ public Message handleMessage(Message message) throws Exception {
+ message = message.toCore();
// a reply message is sent with the result stored in the message body.
Message reply = new CoreMessage(storageManager.generateID(), 512);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 4da2e63..5b44572 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -35,8 +35,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
-
import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.encode.BodyType;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -47,8 +47,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -293,6 +291,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public Long getScheduledDeliveryTime() {
+ return null;
+ }
+
+ @Override
public void reloadPersistence(ActiveMQBuffer record) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index aa1bdc4..4640c33 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,28 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -61,7 +39,24 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -105,6 +99,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+
@RunWith(Parameterized.class)
public class ProtonTest extends ProtonTestBase {
@@ -379,7 +378,7 @@ public class ProtonTest extends ProtonTestBase {
receiver.flow(1);
// Shouldn't get this since we delayed the message.
- assertNull(receiver.receive(5, TimeUnit.SECONDS));
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 442d6e9..40f2ebd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -364,6 +364,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
+ public Long getScheduledDeliveryTime() {
+ return null;
+ }
+
+ @Override
public Message toCore() {
return this;
}