You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:26 UTC

[33/36] activemq-artemis git commit: Fixing Scheduled Message

Fixing Scheduled Message


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef95eaa8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef95eaa8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef95eaa8

Branch: refs/heads/artemis-1009
Commit: ef95eaa848bb8d73271d60c2030be336c3862713
Parents: 4246128
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 1 17:43:51 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  3 +
 .../artemis/core/message/impl/CoreMessage.java  | 11 ++++
 .../protocol/amqp/broker/AMQPMessage.java       | 69 ++++++++++++--------
 .../core/postoffice/impl/PostOfficeImpl.java    | 18 +++--
 .../management/impl/ManagementServiceImpl.java  |  3 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  9 ++-
 .../tests/integration/amqp/ProtonTest.java      | 49 +++++++-------
 .../integration/client/AcknowledgeTest.java     |  5 ++
 8 files changed, 99 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index d96f232..b08202d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -169,6 +169,9 @@ public interface Message {
 
    void messageChanged();
 
+   /** Used to calculate what is the delivery time.
+    *  Return null if not scheduled. */
+   Long getScheduledDeliveryTime();
 
    /** Used for Large messages on Core.
     *  Do not use this, it will go away

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 513b758..f620a1d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -153,6 +153,17 @@ public class CoreMessage extends RefCountMessage {
       }
    }
 
+   @Override
+   public Long getScheduledDeliveryTime() {
+      Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+      if (property != null && property instanceof Number) {
+         return ((Number) property).longValue();
+      }
+
+      return null;
+   }
+
    /**
     * {@inheritDoc}
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d39bf9d..772f2cd 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -63,6 +64,7 @@ public class AMQPMessage extends RefCountMessage {
    private long expiration = 0;
    // this can be used to encode the header again and the rest of the message buffer
    private int headerEnd = -1;
+   private boolean parsedHeaders = false;
    private Header _header;
    private DeliveryAnnotations _deliveryAnnotations;
    private MessageAnnotations _messageAnnotations;
@@ -142,39 +144,55 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    private ApplicationProperties getApplicationProperties() {
-      if (applicationProperties == null) {
-         if (data != null) {
-            partialDecode(data.nioBuffer(), true);
-         } else {
-            initalizeObjects();
-         }
-      }
-
+      parseHeaders();
       return applicationProperties;
    }
 
-   public Header getHeader() {
-      if (_header == null) {
+   private void parseHeaders() {
+      if (!parsedHeaders) {
          if (data == null) {
             initalizeObjects();
          } else {
-            partialDecode(this.data.nioBuffer(), false);
+            partialDecode(data.nioBuffer());
          }
+         parsedHeaders = true;
       }
+   }
 
+   public MessageAnnotations getMessageAnnotations() {
+      parseHeaders();
+      return _messageAnnotations;
+   }
+
+   public Header getHeader() {
+      parseHeaders();
       return _header;
    }
 
    public Properties getProperties() {
-      if (_properties == null) {
-         if (data == null) {
-            initalizeObjects();
-         } else {
-            partialDecode(this.data.nioBuffer(), true);
-         }
+      parseHeaders();
+      return _properties;
+   }
+
+   private Object getSymbol(String symbol) {
+      MessageAnnotations annotations = getMessageAnnotations();
+      Map mapAnnotations = annotations != null ? annotations.getValue() : null;
+      if (mapAnnotations != null) {
+         return mapAnnotations.get(Symbol.getSymbol("x-opt-delivery-time"));
       }
 
-      return _properties;
+      return null;
+   }
+
+   @Override
+   public Long getScheduledDeliveryTime() {
+
+      Object scheduledTime = getSymbol("x-opt-delivery-time");
+      if (scheduledTime != null && scheduledTime instanceof Number) {
+         return ((Number)scheduledTime).longValue();
+      }
+
+      return null;
    }
 
    @Override
@@ -182,7 +200,7 @@ public class AMQPMessage extends RefCountMessage {
       return AMQPMessagePersister.getInstance();
    }
 
-   private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) {
+   private synchronized void partialDecode(ByteBuffer buffer) {
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setByteBuffer(buffer);
       buffer.position(0);
@@ -207,11 +225,7 @@ public class AMQPMessage extends RefCountMessage {
                this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
             }
 
-            if (!readApplicationProperties) {
-               return;
-            }
-
-            if (buffer.hasRemaining() && readApplicationProperties) {
+            if (buffer.hasRemaining()) {
                section = (Section) decoder.readObject();
             } else {
                section = null;
@@ -220,10 +234,6 @@ public class AMQPMessage extends RefCountMessage {
             // meaning there is no header
             headerEnd = 0;
          }
-
-         if (!readApplicationProperties) {
-            return;
-         }
          if (section instanceof DeliveryAnnotations) {
             _deliveryAnnotations = (DeliveryAnnotations) section;
 
@@ -254,6 +264,7 @@ public class AMQPMessage extends RefCountMessage {
             }
 
          }
+
          if (section instanceof ApplicationProperties) {
             applicationProperties = (ApplicationProperties) section;
          }
@@ -785,7 +796,7 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public org.apache.activemq.artemis.api.core.Message toCore() {
       MessageImpl protonMessage = getProtonMessage();
-      return null;
+      throw new IllegalStateException("conversion between AMQP and Core not implemented yet!");
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 81a83ac..d23185e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1079,6 +1079,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       Transaction tx = context.getTransaction();
 
+      Long deliveryTime = message.getScheduledDeliveryTime();
+
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
          PagingStore store = pagingManager.getPageStore(entry.getKey());
 
@@ -1095,12 +1097,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          for (Queue queue : entry.getValue().getNonDurableQueues()) {
             MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
-            refs.add(reference);
-            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
-               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            if (deliveryTime != null) {
+               reference.setScheduledDeliveryTime(deliveryTime);
             }
+            refs.add(reference);
 
             message.incrementRefCount();
          }
@@ -1119,13 +1119,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                }
             }
 
-            refs.add(reference);
-
-            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
 
-               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            if (deliveryTime != null) {
+               reference.setScheduledDeliveryTime(deliveryTime);
             }
+            refs.add(reference);
 
             if (message.isDurable()) {
                int durableRefCount = message.incrementDurableRefCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 002b2c7..5b2bf28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -365,7 +365,8 @@ public class ManagementServiceImpl implements ManagementService {
    }
 
    @Override
-   public Message handleMessage(final Message message) throws Exception {
+   public Message handleMessage(Message message) throws Exception {
+      message = message.toCore();
       // a reply message is sent with the result stored in the message body.
       Message reply = new CoreMessage(storageManager.generateID(), 512);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 4da2e63..5b44572 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -35,8 +35,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
-
 import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.encode.BodyType;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -47,8 +47,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -293,6 +291,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public Long getScheduledDeliveryTime() {
+         return null;
+      }
+
+      @Override
       public void reloadPersistence(ActiveMQBuffer record) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index aa1bdc4..4640c33 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,28 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -61,7 +39,24 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -105,6 +99,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+
 @RunWith(Parameterized.class)
 public class ProtonTest extends ProtonTestBase {
 
@@ -379,7 +378,7 @@ public class ProtonTest extends ProtonTestBase {
          receiver.flow(1);
 
          // Shouldn't get this since we delayed the message.
-         assertNull(receiver.receive(5, TimeUnit.SECONDS));
+         assertNull(receiver.receive(1, TimeUnit.SECONDS));
       } finally {
          connection.close();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ef95eaa8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 442d6e9..40f2ebd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -364,6 +364,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
+      public Long getScheduledDeliveryTime() {
+         return null;
+      }
+
+      @Override
       public Message toCore() {
          return this;
       }