You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2018/04/13 15:09:00 UTC
qpid-proton-j git commit: PROTON-1828: add ability limit outgoing
frame sizes
Repository: qpid-proton-j
Updated Branches:
refs/heads/master 84b3ce477 -> e5a7dcade
PROTON-1828: add ability limit outgoing frame sizes
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/e5a7dcad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/e5a7dcad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/e5a7dcad
Branch: refs/heads/master
Commit: e5a7dcade2996b2b68967949ddf1377f954bf579
Parents: 84b3ce4
Author: Robbie Gemmell <ro...@apache.org>
Authored: Fri Apr 13 16:03:27 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Fri Apr 13 16:03:27 2018 +0100
----------------------------------------------------------------------
.../apache/qpid/proton/engine/Transport.java | 13 ++
.../qpid/proton/engine/impl/TransportImpl.java | 20 ++-
.../proton/engine/impl/TransportImplTest.java | 130 ++++++++++++++++++-
3 files changed, 161 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
index 35b2d50..f8de042 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
@@ -308,4 +308,17 @@ public interface Transport extends Endpoint
void setEmitFlowEventOnSend(boolean emitFlowEventOnSend);
boolean isEmitFlowEventOnSend();
+
+ /**
+ * Set an upper limit on the size of outgoing frames that will be sent
+ * to the peer. Allows constraining the transport not to emit Transfer
+ * frames over a given size even when the peers max frame size allows it.
+ *
+ * Must be set before receiving the peers Open frame to have effect.
+ *
+ * @param size the size limit to apply
+ */
+ void setOutboundFrameSizeLimit(int size);
+
+ int getOutboundFrameSizeLimit();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 1d0103e..afadb5f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -104,6 +104,7 @@ public class TransportImpl extends EndpointImpl
private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int _remoteMaxFrameSize = MIN_MAX_FRAME_SIZE;
+ private int _outboundFrameSizeLimit = 0;
private int _channelMax = CHANNEL_MAX_LIMIT;
private int _remoteChannelMax = CHANNEL_MAX_LIMIT;
@@ -1105,12 +1106,19 @@ public class TransportImpl extends EndpointImpl
_open = open;
}
+ int effectiveMaxFrameSize = _remoteMaxFrameSize;
if(open.getMaxFrameSize().longValue() > 0)
{
_remoteMaxFrameSize = (int) open.getMaxFrameSize().longValue();
- _frameWriter.setMaxFrameSize(_remoteMaxFrameSize);
+ effectiveMaxFrameSize = (int) Math.min(open.getMaxFrameSize().longValue(), Integer.MAX_VALUE);
}
+ if(_outboundFrameSizeLimit > 0) {
+ effectiveMaxFrameSize = (int) Math.min(open.getMaxFrameSize().longValue(), _outboundFrameSizeLimit);
+ }
+
+ _frameWriter.setMaxFrameSize(effectiveMaxFrameSize);
+
if (open.getChannelMax().longValue() > 0)
{
_remoteChannelMax = (int) open.getChannelMax().longValue();
@@ -1779,4 +1787,14 @@ public class TransportImpl extends EndpointImpl
_additionalTransportLayers.add(layer);
}
}
+
+ @Override
+ public void setOutboundFrameSizeLimit(int limit) {
+ _outboundFrameSizeLimit = limit;
+ }
+
+ @Override
+ public int getOutboundFrameSizeLimit() {
+ return _outboundFrameSizeLimit;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index ead411f..50c04fd 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
+import java.util.Random;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
@@ -78,7 +79,7 @@ public class TransportImplTest
private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID, new Begin(), null);
private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID, new Open(), null);
- private static final int BUFFER_SIZE = 4096;
+ private static final int BUFFER_SIZE = 8 * 1024;
@Rule
public ExpectedException _expectedException = ExpectedException.none();
@@ -2503,6 +2504,122 @@ public class TransportImplTest
}
}
+ @Test
+ public void testMaxFrameSizeOfPeerHasEffect()
+ {
+ doMaxFrameSizeTestImpl(0, 0, 5700, 1);
+ doMaxFrameSizeTestImpl(1024, 0, 5700, 6);
+ }
+
+ @Test
+ public void testMaxFrameSizeOutgoingFrameSizeLimitHasEffect()
+ {
+ doMaxFrameSizeTestImpl(0, 512, 5700, 12);
+ doMaxFrameSizeTestImpl(1024, 512, 5700, 12);
+ doMaxFrameSizeTestImpl(1024, 2048, 5700, 6);
+ }
+
+ void doMaxFrameSizeTestImpl(int remoteMaxFrameSize, int outboundFrameSizeLimit, int contentLength, int expectedNumFrames)
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+
+ // If we have been given an outboundFrameSizeLimit, configure it
+ if(outboundFrameSizeLimit != 0) {
+ transport.setOutboundFrameSizeLimit(outboundFrameSizeLimit);
+ }
+
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "mySender";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ String messageContent = createLargeContent(contentLength);
+ sendMessage(sender, "tag1", messageContent);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
+
+ // Now open the connection, expect the Open and Begin frames but
+ // nothing else as we haven't opened the receiver itself yet.
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+ // Send the necessary responses to open/begin/attach then give sender credit
+ Open open = new Open();
+ if(remoteMaxFrameSize != 0) {
+ open.setMaxFrameSize(UnsignedInteger.valueOf(remoteMaxFrameSize));
+ }
+ transport.handleFrame(new TransportFrame(0, open, null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.RECEIVER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ Flow flow = new Flow();
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ONE);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+ transport.handleFrame(new TransportFrame(0, flow, null));
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ // Now pump the transport again and expect transfers for the message
+ pumpMockTransport(transport);
+
+ // This calc isn't entirely precise, there is some added performative/frame overhead not
+ // accounted for...but values are chosen to work, and verified here.
+ final int frameCount;
+ if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit == 0) {
+ frameCount = 1;
+ } else if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit != 0) {
+ frameCount = (int) Math.ceil((double)contentLength / (double) outboundFrameSizeLimit);
+ } else {
+ int effectiveMaxFrameSize;
+ if(outboundFrameSizeLimit != 0) {
+ effectiveMaxFrameSize = Math.min(outboundFrameSizeLimit, remoteMaxFrameSize);
+ } else {
+ effectiveMaxFrameSize = remoteMaxFrameSize;
+ }
+
+ frameCount = (int) Math.ceil((double)contentLength / (double) effectiveMaxFrameSize);
+ }
+
+ assertEquals("Unexpected number of frames calculated", expectedNumFrames, frameCount);
+
+ final int start = 3;
+ final int totalExpected = start + frameCount;
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), totalExpected, transport.writes.size());
+ for(int i = start; i < totalExpected; i++) {
+ assertTrue("Unexpected frame type", transport.writes.get(i) instanceof Transfer);
+ }
+ }
+
private void processInput(MockTransportImpl transport, ByteBuffer data) {
while (data.remaining() > 0)
{
@@ -2515,4 +2632,15 @@ public class TransportImplTest
}
}
+
+ private static String createLargeContent(int length) {
+ Random rand = new Random(System.currentTimeMillis());
+
+ byte[] payload = new byte[length];
+ for (int i = 0; i < length; i++) {
+ payload[i] = (byte) (64 + 1 + rand.nextInt(9));
+ }
+
+ return new String(payload, StandardCharsets.UTF_8);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org