You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/06 23:44:29 UTC
[1/2] activemq-artemis git commit: This closes #1928
Repository: activemq-artemis
Updated Branches:
refs/heads/master a1c39cdad -> cc4a13a37
This closes #1928
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cc4a13a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cc4a13a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cc4a13a3
Branch: refs/heads/master
Commit: cc4a13a3770374a64f992c5b5b2abd87572fa5a7
Parents: a1c39cd 169d0b7
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 6 18:44:22 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 6 18:44:22 2018 -0500
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 107 ++++++++++++++++---
.../amqp/proton/ProtonServerSenderContext.java | 23 ++--
2 files changed, 105 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1722 Don't copy message
bytes unless needed
Posted by cl...@apache.org.
ARTEMIS-1722 Don't copy message bytes unless needed
Alternate patch that doesn't copy the message bytes unless doing a
redelivery or skipping delivery annotations in the original version of
the message. Proton-J will copy the bytes provided to the Sender's send
method so a copy isn't necessary on most common sends.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/169d0b7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/169d0b7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/169d0b7f
Branch: refs/heads/master
Commit: 169d0b7fa7852efecf77048a06fd9be89d749064
Parents: a1c39cd
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 2 15:03:08 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 6 18:44:22 2018 -0500
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 107 ++++++++++++++++---
.../amqp/proton/ProtonServerSenderContext.java | 23 ++--
2 files changed, 105 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/169d0b7f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 56b0e6c..ba87ae6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -665,16 +665,20 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void checkBuffer() {
if (!bufferValid) {
- int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
- try {
- getProtonMessage().encode(new NettyWritable(buffer));
- byte[] bytes = new byte[buffer.writerIndex()];
- buffer.readBytes(bytes);
- this.data = Unpooled.wrappedBuffer(bytes);
- } finally {
- buffer.release();
- }
+ encodeProtonMessage();
+ }
+ }
+
+ private void encodeProtonMessage() {
+ int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
+ try {
+ getProtonMessage().encode(new NettyWritable(buffer));
+ byte[] bytes = new byte[buffer.writerIndex()];
+ buffer.readBytes(bytes);
+ this.data = Unpooled.wrappedBuffer(bytes);
+ } finally {
+ buffer.release();
}
}
@@ -691,15 +695,16 @@ public class AMQPMessage extends RefCountMessage {
int amqpDeliveryCount = deliveryCount - 1;
- Header header = getHeader();
- if (header == null && (amqpDeliveryCount > 0)) {
- header = new Header();
- header.setDurable(durable);
- }
-
// If the re-delivering the message then the header must be re-encoded
// otherwise we want to write the original header if present.
if (amqpDeliveryCount > 0) {
+
+ Header header = getHeader();
+ if (header == null) {
+ header = new Header();
+ header.setDurable(durable);
+ }
+
synchronized (header) {
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
@@ -713,6 +718,76 @@ public class AMQPMessage extends RefCountMessage {
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
}
+ /**
+ * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
+ * <p>
+ * When possible this method will present the bytes to the caller without copying them into
+ * another buffer copy. If copying is needed a new Netty buffer is created and returned. The
+ * caller should ensure that the reference count on the returned buffer is always decremented
+ * to avoid a leak in the case of a copied buffer being returned.
+ *
+ * @param deliveryCount
+ * The new delivery count for this message.
+ *
+ * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
+ */
+ public ByteBuf getSendBuffer(int deliveryCount) {
+ checkBuffer();
+
+ if (deliveryCount > 1) {
+ return createCopyWithNewDeliveryCount(deliveryCount);
+ } else if (headerEnds != messagePaylodStart) {
+ return createCopyWithoutDeliveryAnnotations();
+ } else {
+ // Common case message has no delivery annotations and this is the first delivery
+ // so no re-encoding or section skipping needed.
+ return data.retainedDuplicate();
+ }
+ }
+
+ private ByteBuf createCopyWithoutDeliveryAnnotations() {
+ assert headerEnds != messagePaylodStart;
+
+ // The original message had delivery annotations and so we must copy into a new
+ // buffer skipping the delivery annotations section as that is not meant to survive
+ // beyond this hop.
+ final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+ result.writeBytes(data, 0, headerEnds);
+ result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
+ return result;
+ }
+
+ private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
+ assert deliveryCount > 1;
+
+ final int amqpDeliveryCount = deliveryCount - 1;
+ // If the re-delivering the message then the header must be re-encoded
+ // (or created if not previously present). Any delivery annotations should
+ // be skipped as well in the resulting buffer.
+
+ final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+
+ Header header = getHeader();
+ if (header == null) {
+ header = new Header();
+ header.setDurable(durable);
+ }
+
+ synchronized (header) {
+ // Updates or adds a Header section with the correct delivery count
+ header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
+ TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
+ TLSEncode.getEncoder().writeObject(header);
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ }
+
+ // This will skip any existing delivery annotations that might have been present
+ // in the original message.
+ result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
+
+ return result;
+ }
+
public TypedProperties createExtraProperties() {
if (extraProperties == null) {
extraProperties = new TypedProperties();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/169d0b7f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1823168..990a217 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -22,8 +22,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -48,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -75,6 +74,8 @@ import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
+import io.netty.buffer.ByteBuf;
+
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@@ -690,11 +691,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
- ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
- try {
- message.sendBuffer(nettyBuffer, deliveryCount);
+ // Let the Message decide how to present the message bytes
+ ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
- int size = nettyBuffer.writerIndex();
+ try {
+ int size = sendBuffer.writerIndex();
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@@ -714,8 +715,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
- // this will avoid a copy.. patch provided by Norman using buffer.array()
- sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+ if (sendBuffer.hasArray()) {
+ // this will avoid a copy.. patch provided by Norman using buffer.array()
+ sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes());
+ } else {
+ sender.send(new NettyReadable(sendBuffer));
+ }
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
@@ -731,7 +736,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return size;
} finally {
- nettyBuffer.release();
+ sendBuffer.release();
}
}