You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/08/24 19:44:35 UTC

[qpid-protonj2] branch main updated: PROTON-2593 Add ability to split frame writes to test driver

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new a7ce9f43 PROTON-2593 Add ability to split frame writes to test driver
a7ce9f43 is described below

commit a7ce9f4331026a756c7d57ab82b18d8e91f4b647
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Aug 24 15:44:24 2022 -0400

    PROTON-2593 Add ability to split frame writes to test driver
    
    Allows the test driver performative inject actions to configure that
    they should have the bytes written in split fashion instead of one write
    to validate the frame decoder buffering of incoming incomplete frames
---
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 45 ++++++++++++++++++++++
 .../qpid/protonj2/test/driver/AMQPTestDriver.java  | 31 ++++++++++++++-
 .../protonj2/test/driver/ProtonTestClient.java     |  6 +--
 .../protonj2/test/driver/ProtonTestServer.java     |  6 +--
 .../actions/AbstractPerformativeInjectAction.java  | 12 +++++-
 5 files changed, 91 insertions(+), 9 deletions(-)

diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index f18a46db..3ecfd5e7 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2765,4 +2765,49 @@ public class ReceiverTest extends ImperativeClientTestCase {
            peer.waitForScriptToComplete();
         }
     }
+
+    @Test
+    public void testReceiveTransferWhenRemoteSendsInSplitChunks() throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World!"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withSettled(true)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue().splitWrite(true);
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            Session session = connection.openSession();
+            Receiver receiver = session.openReceiver("test-queue");
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            Delivery delivery = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(delivery);
+
+            Message<?> message = delivery.message();
+            assertEquals("Hello World!", message.body());
+
+            receiver.closeAsync();
+            connection.closeAsync().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index 5d57bc84..d0440b82 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -499,7 +499,23 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
      * @param payload
      *      The payload to include in the encoded frame.
      */
-    public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) {
+    public final void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) {
+        sendAMQPFrame(channel, performative, payload, false);
+    }
+
+    /**
+     * Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
+     *
+     * @param channel
+     *      The channel to use when writing the frame
+     * @param performative
+     *      The AMQP Performative to write
+     * @param payload
+     *      The payload to include in the encoded frame.
+     * @param splitWrite
+     * 		Should the data be written in multiple chunks
+     */
+    public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload, boolean splitWrite) {
         LOG.trace("{} Sending performative: {}", driverName, performative);
 
         if (performative instanceof PerformativeDescribedType) {
@@ -514,7 +530,18 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         try {
             final ByteBuf buffer = frameEncoder.handleWrite(performative, channel, payload, null);
             LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, buffer, frameConsumer);
-            frameConsumer.accept(buffer.nioBuffer());
+
+            if (splitWrite) {
+                final int bufferSplitPoint = buffer.readableBytes() / 2;
+
+                final ByteBuf front = buffer.slice(buffer.readerIndex(), bufferSplitPoint);
+                final ByteBuf rear = buffer.slice(bufferSplitPoint, buffer.readableBytes() - bufferSplitPoint);
+
+                frameConsumer.accept(front.nioBuffer());
+                frameConsumer.accept(rear.nioBuffer());
+            } else {
+                frameConsumer.accept(buffer.nioBuffer());
+            }
         } catch (Throwable t) {
             signalFailure(new AssertionError("Frame was not written due to error.", t));
         }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index 324119fc..aca0d6a9 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -128,13 +128,13 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
         // other driver resources being used on two different threads.
 
         @Override
-        public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) {
+        public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload, boolean splitWrite) {
             EventLoop loop = client.eventLoop();
             if (loop.inEventLoop()) {
-                super.sendAMQPFrame(channel, performative, payload);
+                super.sendAMQPFrame(channel, performative, payload, splitWrite);
             } else {
                 loop.execute(() -> {
-                    super.sendAMQPFrame(channel, performative, payload);
+                    super.sendAMQPFrame(channel, performative, payload, splitWrite);
                 });
             }
         }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index 6a32e107..1ffc857f 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -206,13 +206,13 @@ public class ProtonTestServer extends ProtonTestPeer {
         // other driver resources being used on two different threads.
 
         @Override
-        public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) {
+        public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload, boolean splitWrite) {
             EventLoop loop = server.eventLoop();
             if (loop.inEventLoop()) {
-                super.sendAMQPFrame(channel, performative, payload);
+                super.sendAMQPFrame(channel, performative, payload, splitWrite);
             } else {
                 loop.execute(() -> {
-                    super.sendAMQPFrame(channel, performative, payload);
+                    super.sendAMQPFrame(channel, performative, payload, splitWrite);
                 });
             }
         }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
index 4f2d4d70..f527067b 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
@@ -36,6 +36,7 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
 
     private int channel = CHANNEL_UNSET;
     private int delay = -1;
+    private boolean splitWrite = false;
 
     public AbstractPerformativeInjectAction(AMQPTestDriver driver) {
         this.driver = driver;
@@ -45,7 +46,7 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
     public final AbstractPerformativeInjectAction<P> now() {
         // Give actors a chance to prepare.
         beforeActionPerformed(driver);
-        driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload());
+        driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload(), splitWrite);
         return this;
     }
 
@@ -121,6 +122,15 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
         return this;
     }
 
+    public boolean splitWrite() {
+        return this.splitWrite;
+    }
+
+    public AbstractPerformativeInjectAction<?> splitWrite(boolean value) {
+        this.splitWrite = value;
+        return this;
+    }
+
     /**
      * @return the AMQP Performative that is to be sent as a result of this action.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org