You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:16 UTC

[23/36] activemq-artemis git commit: fixing paging & flow control on AMQP

fixing paging & flow control on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d44c0334
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d44c0334
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d44c0334

Branch: refs/heads/artemis-1009
Commit: d44c03347828a019143fddffec20d52ad37834f2
Parents: 6468186
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 28 21:01:05 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  6 ++++
 .../artemis/core/message/impl/CoreMessage.java  | 14 --------
 .../protocol/amqp/broker/AMQPMessage.java       | 35 ++++++++++++++++----
 3 files changed, 34 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 4a5381c..b266279 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -77,6 +77,12 @@ import org.apache.activemq.artemis.core.persistence.Persister;
  */
 public interface Message {
 
+   // This is an estimate of how much memory a Message takes up, exclusing body and properties
+   // Note, it is only an estimate, it's not possible to be entirely sure with Java
+   // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+   // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+   int memoryOffset = 352;
+
 
    SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index fd09751..edbcaa9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -45,15 +45,8 @@ public class CoreMessage extends RefCountMessage {
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
 
-   // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
-   // Note, it is only an estimate, it's not possible to be entirely sure with Java
-   // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
-   // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-   private static final int memoryOffset = 352;
-
    private volatile int memoryEstimate = -1;
 
-
    private static final Logger logger = Logger.getLogger(CoreMessage.class);
 
    // There's an integer with the number of bytes for the body
@@ -351,16 +344,9 @@ public class CoreMessage extends RefCountMessage {
       return this.properties;
    }
 
-
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
-         if (buffer == null) {
-            new Exception("It is null").printStackTrace();
-         }
-         if (properties == null) {
-            new Exception("Properties It is null").printStackTrace();
-         }
          memoryEstimate = memoryOffset +
             (buffer != null ? buffer.capacity() : 0) +
             (properties != null ? properties.getMemoryOffset() : 0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index ee2f870..c530c94 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Header;
@@ -48,6 +49,8 @@ import org.apache.qpid.proton.util.TLSEncoder;
 // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
+   private volatile int memoryEstimate = -1;
+
    final long messageFormat;
    private ProtonProtocolManager protocolManager;
    ByteBuf data;
@@ -192,6 +195,9 @@ public class AMQPMessage extends RefCountMessage {
             } else {
                section = null;
             }
+         } else {
+            // meaning there is no header
+            headerEnd = 0;
          }
 
          if (!readApplicationProperties) {
@@ -257,27 +263,26 @@ public class AMQPMessage extends RefCountMessage {
       this.data = null;
    }
 
-   // TODO-now this only make sense on Core
    @Override
    public ActiveMQBuffer getBodyBuffer() {
+      // NO-IMPL
       return null;
    }
 
-   // TODO-now this only make sense on Core
    @Override
    public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      // NO-IMPL
       return null;
    }
 
-   // TODO: Refactor Large message
    @Override
    public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+      // NO-IMPL
       return null;
    }
 
    @Override
    public byte getType() {
-      // TODO-now: what to do here?
       return type;
    }
 
@@ -309,7 +314,6 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message copy() {
-      // TODO-now: what to do with this?
       AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager);
       return newEncode;
    }
@@ -471,7 +475,19 @@ public class AMQPMessage extends RefCountMessage {
       //       I would send a new instance of Header with a new delivery count, and only send partial of the buffer
       //       previously received
       checkBuffer();
-      buffer.writeBytes(data);
+      Header header = getHeader();
+      if (header != null) {
+         synchronized (header) {
+            if (header.getDeliveryCount() != null) {
+               header.setDeliveryCount(UnsignedInteger.valueOf(header.getDeliveryCount().intValue() + 1));
+            } else {
+               header.setDeliveryCount(UnsignedInteger.valueOf(1));
+            }
+            TLSEncoder.getEncoder().setByteBuffer(new NettyWritable(buffer));
+            TLSEncoder.getEncoder().writeObject(header);
+         }
+      }
+      buffer.writeBytes(data, headerEnd, data.writerIndex() - headerEnd);
    }
 
    @Override
@@ -728,7 +744,12 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public int getMemoryEstimate() {
-      return 0;
+      if (memoryEstimate == -1) {
+         memoryEstimate = memoryOffset +
+            (data != null ? data.capacity() : 0);
+      }
+
+      return memoryEstimate;
    }
 
    @Override