You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:16 UTC
[23/36] activemq-artemis git commit: fixing paging & flow control on
AMQP
fixing paging & flow control on AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d44c0334
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d44c0334
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d44c0334
Branch: refs/heads/artemis-1009
Commit: d44c03347828a019143fddffec20d52ad37834f2
Parents: 6468186
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 28 21:01:05 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 6 ++++
.../artemis/core/message/impl/CoreMessage.java | 14 --------
.../protocol/amqp/broker/AMQPMessage.java | 35 ++++++++++++++++----
3 files changed, 34 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 4a5381c..b266279 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -77,6 +77,12 @@ import org.apache.activemq.artemis.core.persistence.Persister;
*/
public interface Message {
+ // This is an estimate of how much memory a Message takes up, exclusing body and properties
+ // Note, it is only an estimate, it's not possible to be entirely sure with Java
+ // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+ // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+ int memoryOffset = 352;
+
SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index fd09751..edbcaa9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -45,15 +45,8 @@ public class CoreMessage extends RefCountMessage {
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
- // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
- // Note, it is only an estimate, it's not possible to be entirely sure with Java
- // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
- // The value is somewhat higher on 64 bit architectures, probably due to different alignment
- private static final int memoryOffset = 352;
-
private volatile int memoryEstimate = -1;
-
private static final Logger logger = Logger.getLogger(CoreMessage.class);
// There's an integer with the number of bytes for the body
@@ -351,16 +344,9 @@ public class CoreMessage extends RefCountMessage {
return this.properties;
}
-
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
- if (buffer == null) {
- new Exception("It is null").printStackTrace();
- }
- if (properties == null) {
- new Exception("Properties It is null").printStackTrace();
- }
memoryEstimate = memoryOffset +
(buffer != null ? buffer.capacity() : 0) +
(properties != null ? properties.getMemoryOffset() : 0);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d44c0334/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index ee2f870..c530c94 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
@@ -48,6 +49,8 @@ import org.apache.qpid.proton.util.TLSEncoder;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
+ private volatile int memoryEstimate = -1;
+
final long messageFormat;
private ProtonProtocolManager protocolManager;
ByteBuf data;
@@ -192,6 +195,9 @@ public class AMQPMessage extends RefCountMessage {
} else {
section = null;
}
+ } else {
+ // meaning there is no header
+ headerEnd = 0;
}
if (!readApplicationProperties) {
@@ -257,27 +263,26 @@ public class AMQPMessage extends RefCountMessage {
this.data = null;
}
- // TODO-now this only make sense on Core
@Override
public ActiveMQBuffer getBodyBuffer() {
+ // NO-IMPL
return null;
}
- // TODO-now this only make sense on Core
@Override
public ActiveMQBuffer getReadOnlyBodyBuffer() {
+ // NO-IMPL
return null;
}
- // TODO: Refactor Large message
@Override
public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+ // NO-IMPL
return null;
}
@Override
public byte getType() {
- // TODO-now: what to do here?
return type;
}
@@ -309,7 +314,6 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message copy() {
- // TODO-now: what to do with this?
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager);
return newEncode;
}
@@ -471,7 +475,19 @@ public class AMQPMessage extends RefCountMessage {
// I would send a new instance of Header with a new delivery count, and only send partial of the buffer
// previously received
checkBuffer();
- buffer.writeBytes(data);
+ Header header = getHeader();
+ if (header != null) {
+ synchronized (header) {
+ if (header.getDeliveryCount() != null) {
+ header.setDeliveryCount(UnsignedInteger.valueOf(header.getDeliveryCount().intValue() + 1));
+ } else {
+ header.setDeliveryCount(UnsignedInteger.valueOf(1));
+ }
+ TLSEncoder.getEncoder().setByteBuffer(new NettyWritable(buffer));
+ TLSEncoder.getEncoder().writeObject(header);
+ }
+ }
+ buffer.writeBytes(data, headerEnd, data.writerIndex() - headerEnd);
}
@Override
@@ -728,7 +744,12 @@ public class AMQPMessage extends RefCountMessage {
@Override
public int getMemoryEstimate() {
- return 0;
+ if (memoryEstimate == -1) {
+ memoryEstimate = memoryOffset +
+ (data != null ? data.capacity() : 0);
+ }
+
+ return memoryEstimate;
}
@Override