You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/09 10:46:46 UTC
qpid-broker-j git commit: QPID-7963: [Java Broker] [AMQP1.0] Improve
heap memory consumption on transferring large messages
Repository: qpid-broker-j
Updated Branches:
refs/heads/master cb28e4854 -> 3eb2426b0
QPID-7963: [Java Broker] [AMQP1.0] Improve heap memory consumption on transferring large messages
The changes are based on patch submitted by Rob Godfrey <rg...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3eb2426b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3eb2426b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3eb2426b
Branch: refs/heads/master
Commit: 3eb2426b0f4f784817f64e032cd29057e8010272
Parents: cb28e48
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Oct 9 11:25:02 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Oct 9 11:25:02 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 23 ++++----
.../protocol/v1_0/SendingLinkEndpoint.java | 3 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 58 ++++++++++----------
3 files changed, 44 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 97a3081..c24b9a5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1214,17 +1214,20 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
int payloadSize = 0;
for(QpidByteBuffer buf : payload)
{
- if(payloadSize + buf.remaining() < maxPayloadSize)
+ if (buf.hasRemaining())
{
- payloadSize += buf.remaining();
- payloadDup.add(buf.duplicate());
- }
- else
- {
- QpidByteBuffer dup = buf.slice();
- dup.limit(maxPayloadSize-payloadSize);
- payloadDup.add(dup);
- break;
+ if (payloadSize + buf.remaining() < maxPayloadSize)
+ {
+ payloadSize += buf.remaining();
+ payloadDup.add(buf.duplicate());
+ }
+ else
+ {
+ QpidByteBuffer dup = buf.slice();
+ dup.limit(maxPayloadSize - payloadSize);
+ payloadDup.add(dup);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 6174f62..ec6939b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -389,7 +388,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
xfr.setHandle(getLocalHandle());
- s.sendTransfer(xfr, this, true);
+ s.sendTransfer(xfr, this);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e3e013d..8a5a8ed 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -273,46 +273,52 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
end(new End());
}
- public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
+ void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint)
{
_nextOutgoingId.incr();
- UnsignedInteger deliveryId;
final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
- if (newDelivery)
+ UnsignedInteger deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+ xfr.setDeliveryId(deliveryId);
+ if (!settled)
{
- deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
- xfr.setDeliveryId(deliveryId);
- if (!settled)
- {
- final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
- _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
- }
+ final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
+ _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
}
_remoteIncomingWindow--;
+ List<QpidByteBuffer> payload = xfr.getPayload();
try
{
- List<QpidByteBuffer> payload = xfr.getPayload();
- final long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
+ long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
-
- if(payload != null && payloadSent < remaining && payloadSent >= 0)
+ if(payload != null)
{
- // TODO - should make this iterative and not recursive
+ while (payloadSent < remaining && payloadSent >= 0)
+ {
+ Transfer continuationTransfer = new Transfer();
- Transfer secondTransfer = new Transfer();
+ continuationTransfer.setHandle(xfr.getHandle());
+ continuationTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
+ continuationTransfer.setState(xfr.getState());
+ continuationTransfer.setPayload(payload);
- secondTransfer.setHandle(xfr.getHandle());
- secondTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
- secondTransfer.setState(xfr.getState());
- secondTransfer.setPayload(payload);
+ _nextOutgoingId.incr();
+ _remoteIncomingWindow--;
- sendTransfer(secondTransfer, endpoint, false);
+ remaining = QpidByteBufferUtils.remaining(payload);
+ payloadSent = _connection.sendFrame(_sendingChannel, continuationTransfer, payload);
- secondTransfer.dispose();
+ continuationTransfer.dispose();
+ }
}
-
- if (payload != null)
+ }
+ catch (OversizeFrameException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ finally
+ {
+ if(payload != null)
{
for (QpidByteBuffer buf : payload)
{
@@ -320,10 +326,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
}
}
- catch (OversizeFrameException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
}
public boolean isActive()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org