You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/09 10:46:46 UTC

qpid-broker-j git commit: QPID-7963: [Java Broker] [AMQP1.0] Improve heap memory consumption on transferring large messages

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master cb28e4854 -> 3eb2426b0


QPID-7963: [Java Broker] [AMQP1.0] Improve heap memory consumption on transferring large messages

The changes are based on patch submitted by Rob Godfrey <rg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3eb2426b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3eb2426b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3eb2426b

Branch: refs/heads/master
Commit: 3eb2426b0f4f784817f64e032cd29057e8010272
Parents: cb28e48
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Oct 9 11:25:02 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Oct 9 11:25:02 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   | 23 ++++----
 .../protocol/v1_0/SendingLinkEndpoint.java      |  3 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 58 ++++++++++----------
 3 files changed, 44 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 97a3081..c24b9a5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1214,17 +1214,20 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                     int payloadSize = 0;
                     for(QpidByteBuffer buf : payload)
                     {
-                        if(payloadSize + buf.remaining() < maxPayloadSize)
+                        if (buf.hasRemaining())
                         {
-                            payloadSize += buf.remaining();
-                            payloadDup.add(buf.duplicate());
-                        }
-                        else
-                        {
-                            QpidByteBuffer dup = buf.slice();
-                            dup.limit(maxPayloadSize-payloadSize);
-                            payloadDup.add(dup);
-                            break;
+                            if (payloadSize + buf.remaining() < maxPayloadSize)
+                            {
+                                payloadSize += buf.remaining();
+                                payloadDup.add(buf.duplicate());
+                            }
+                            else
+                            {
+                                QpidByteBuffer dup = buf.slice();
+                                dup.limit(maxPayloadSize - payloadSize);
+                                payloadDup.add(dup);
+                                break;
+                            }
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 6174f62..ec6939b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.security.AccessControlException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -389,7 +388,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 
         xfr.setHandle(getLocalHandle());
 
-        s.sendTransfer(xfr, this, true);
+        s.sendTransfer(xfr, this);
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3eb2426b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e3e013d..8a5a8ed 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -273,46 +273,52 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         end(new End());
     }
 
-    public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
+    void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint)
     {
         _nextOutgoingId.incr();
-        UnsignedInteger deliveryId;
         final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
-        if (newDelivery)
+        UnsignedInteger deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+        xfr.setDeliveryId(deliveryId);
+        if (!settled)
         {
-            deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
-            xfr.setDeliveryId(deliveryId);
-            if (!settled)
-            {
-                final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
-                _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
-            }
+            final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
+            _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
         }
 
         _remoteIncomingWindow--;
+        List<QpidByteBuffer> payload = xfr.getPayload();
         try
         {
-            List<QpidByteBuffer> payload = xfr.getPayload();
-            final long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
+            long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
             int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
-
-            if(payload != null && payloadSent < remaining && payloadSent >= 0)
+            if(payload != null)
             {
-                // TODO - should make this iterative and not recursive
+                while (payloadSent < remaining && payloadSent >= 0)
+                {
+                    Transfer continuationTransfer = new Transfer();
 
-                Transfer secondTransfer = new Transfer();
+                    continuationTransfer.setHandle(xfr.getHandle());
+                    continuationTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
+                    continuationTransfer.setState(xfr.getState());
+                    continuationTransfer.setPayload(payload);
 
-                secondTransfer.setHandle(xfr.getHandle());
-                secondTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
-                secondTransfer.setState(xfr.getState());
-                secondTransfer.setPayload(payload);
+                    _nextOutgoingId.incr();
+                    _remoteIncomingWindow--;
 
-                sendTransfer(secondTransfer, endpoint, false);
+                    remaining = QpidByteBufferUtils.remaining(payload);
+                    payloadSent = _connection.sendFrame(_sendingChannel, continuationTransfer, payload);
 
-                secondTransfer.dispose();
+                    continuationTransfer.dispose();
+                }
             }
-
-            if (payload != null)
+        }
+        catch (OversizeFrameException e)
+        {
+            throw new ConnectionScopedRuntimeException(e);
+        }
+        finally
+        {
+            if(payload != null)
             {
                 for (QpidByteBuffer buf : payload)
                 {
@@ -320,10 +326,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                 }
             }
         }
-        catch (OversizeFrameException e)
-        {
-            throw new ConnectionScopedRuntimeException(e);
-        }
     }
 
     public boolean isActive()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org