You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/02/15 16:30:52 UTC

[qpid-protonj2] branch main updated: PROTON-2682 Allow deferred writes to be scripted in tests

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 9233deb9 PROTON-2682 Allow deferred writes to be scripted in tests
9233deb9 is described below

commit 9233deb954624bb2abe8ccdda34e6d94e84fac38
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Feb 15 11:30:42 2023 -0500

    PROTON-2682 Allow deferred writes to be scripted in tests
    
    Allows writes to be deferred in order to bunch frame writes into the
    same IO write operation.
---
 .../qpid/protonj2/test/driver/AMQPTestDriver.java  | 136 +++++++++++++++++++--
 .../test/driver/DeferrableScriptedAction.java      |  45 +++++++
 .../protonj2/test/driver/ProtonTestClient.java     |  36 ++++++
 .../protonj2/test/driver/ProtonTestServer.java     |  36 ++++++
 .../driver/actions/AMQPHeaderInjectAction.java     |  22 +++-
 .../actions/AbstractPerformativeInjectAction.java  |  21 +++-
 .../AbstractSaslPerformativeInjectAction.java      |  23 +++-
 .../protonj2/test/driver/SenderHandlingTest.java   |  42 +++++++
 8 files changed, 339 insertions(+), 22 deletions(-)

diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index d7476f4b..f3bf0e2b 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import io.netty5.buffer.Buffer;
 import io.netty5.buffer.BufferAllocator;
+import io.netty5.buffer.CompositeBuffer;
 import io.netty5.channel.EventLoop;
 
 /**
@@ -60,6 +61,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
     private final Consumer<AssertionError> assertionConsumer;
     private final Supplier<EventLoop> schedulerSupplier;
 
+    private volatile CompositeBuffer deferredWrites;
     private volatile AssertionError failureCause;
 
     private int advertisedIdleTimeout = 0;
@@ -505,6 +507,81 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         sendAMQPFrame(channel, performative, payload, false);
     }
 
+    /**
+     * Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
+     * send if initiated.
+     *
+     * @param channel
+     *      The channel to use when writing the frame
+     * @param performative
+     *      The AMQP Performative to write
+     * @param payload
+     *      The payload to include in the encoded frame.
+     * @param splitWrite
+     * 		Should the data be written in multiple chunks
+     */
+    public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+        LOG.trace("{} Deferring write of performative: {}", driverName, performative);
+        try (Buffer buffer = frameEncoder.handleWrite(performative, channel, payload, null)) {
+            if (deferredWrites == null) {
+                deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+            }
+
+            deferredWrites.extendWith(buffer.send());
+        } catch (Throwable t) {
+            signalFailure(new AssertionError("Frame was not written due to error.", t));
+        }
+    }
+
+    /**
+     * Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
+     * send if initiated.
+     *
+     * @param channel
+     *      The channel to use when writing the frame
+     * @param performative
+     *      The SASL Performative to write
+     */
+    public void deferSaslFrame(int channel, DescribedType performative) {
+        // When the outcome of SASL is written the decoder should revert to initial state
+        // as the only valid next incoming value is an AMQP header.
+        if (performative instanceof SaslOutcome) {
+            frameParser.resetToExpectingHeader();
+        }
+
+        LOG.trace("{} Deferring SASL performative write: {}", driverName, performative);
+
+        try (Buffer buffer = frameEncoder.handleWrite(performative, channel)) {
+            if (deferredWrites == null) {
+                deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+            }
+
+            deferredWrites.extendWith(buffer.send());
+        } catch (Throwable t) {
+            signalFailure(new AssertionError("Frame was not written due to error.", t));
+        }
+    }
+
+    /**
+     * Encodes the given Header data into a ProtonBuffer but does not write the data until the next non-deferred
+     * send if initiated.
+     *
+     * @param header
+     *      The byte array to send as the AMQP Header.
+     */
+    public void deferHeader(AMQPHeader header) {
+        LOG.trace("{} Deferring AMQP Header write: {}", driverName, header);
+        try {
+            if (deferredWrites == null) {
+                deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+            }
+
+            deferredWrites.extendWith(BufferAllocator.onHeapUnpooled().copyOf(header.getBuffer()).send());
+        } catch (Throwable t) {
+            signalFailure(new AssertionError("Frame was not consumed due to error.", t));
+        }
+    }
+
     /**
      * Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
      *
@@ -530,22 +607,32 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         }
 
         try (Buffer buffer = frameEncoder.handleWrite(performative, channel, payload, null)) {
-            LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, buffer, frameConsumer);
+            final Buffer buffered;
+            if (deferredWrites != null) {
+                deferredWrites.extendWith(buffer.send());
+                buffered = deferredWrites;
+                deferredWrites = null;
+                LOG.trace("{} appending deferred buffer {} to next write.", driverName, buffered);
+            } else {
+                buffered = buffer;
+            }
+
+            LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, buffered, frameConsumer);
 
             if (splitWrite) {
-                final int bufferSplitPoint = buffer.readableBytes() / 2;
+                final int bufferSplitPoint = buffered.readableBytes() / 2;
 
-                final ByteBuffer front = ByteBuffer.allocate(bufferSplitPoint - buffer.readerOffset());
-                final ByteBuffer rear = ByteBuffer.allocate(buffer.readableBytes() - bufferSplitPoint);
+                final ByteBuffer front = ByteBuffer.allocate(bufferSplitPoint - buffered.readerOffset());
+                final ByteBuffer rear = ByteBuffer.allocate(buffered.readableBytes() - bufferSplitPoint);
 
-                buffer.readBytes(front);
-                buffer.readBytes(rear);
+                buffered.readBytes(front);
+                buffered.readBytes(rear);
 
                 frameConsumer.accept(front.flip());
                 frameConsumer.accept(rear.flip());
             } else {
-                final ByteBuffer output = ByteBuffer.allocate(buffer.readableBytes());
-                buffer.readBytes(output);
+                final ByteBuffer output = ByteBuffer.allocate(buffered.readableBytes());
+                buffered.readBytes(output);
 
                 frameConsumer.accept(output.flip());
             }
@@ -569,11 +656,21 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
             frameParser.resetToExpectingHeader();
         }
 
-        LOG.trace("{} Sending sasl performative: {}", driverName, performative);
-
         try (Buffer buffer = frameEncoder.handleWrite(performative, channel)) {
-            final ByteBuffer output = ByteBuffer.allocate(buffer.readableBytes());
-            buffer.readBytes(output);
+            final Buffer buffered;
+            if (deferredWrites != null) {
+                deferredWrites.extendWith(buffer.send());
+                buffered = deferredWrites;
+                deferredWrites = null;
+                LOG.trace("{} appending deferred buffer {} to next write.", driverName, buffered);
+            } else {
+                buffered = buffer;
+            }
+
+            LOG.trace("{} Sending SASL performative: {}", driverName, performative);
+
+            final ByteBuffer output = ByteBuffer.allocate(buffered.readableBytes());
+            buffered.readBytes(output);
 
             frameConsumer.accept(output.flip());
         } catch (Throwable t) {
@@ -588,8 +685,21 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
      *      The byte array to send as the AMQP Header.
      */
     public void sendHeader(AMQPHeader header) {
-        LOG.trace("{} Sending AMQP Header: {}", driverName, header);
         try {
+            final ByteBuffer output;
+
+            if (deferredWrites != null) {
+                LOG.trace("{} appending deferred buffer {} to next write.", driverName, deferredWrites);
+                deferredWrites.extendWith(BufferAllocator.onHeapUnpooled().copyOf(header.getBuffer()).send());
+                output = ByteBuffer.allocate(deferredWrites.readableBytes());
+                deferredWrites.readBytes(output);
+                output.flip();
+                deferredWrites = null;
+            } else {
+                output = ByteBuffer.wrap(header.getBuffer());
+            }
+
+            LOG.trace("{} Sending AMQP Header: {}", driverName, header);
             frameConsumer.accept(ByteBuffer.wrap(header.getBuffer()));
         } catch (Throwable t) {
             signalFailure(new AssertionError("Frame was not consumed due to error.", t));
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java
new file mode 100644
index 00000000..a42253b4
--- /dev/null
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver;
+
+/**
+ * A Type of scripted action whose output can be deferred until the next
+ * {@link ScriptedAction} that writes data which is not deferred.
+ */
+public interface DeferrableScriptedAction extends ScriptedAction {
+
+    /**
+     * Configures this action such that any output from the action is
+     * deferred until the next action that executes without the defer
+     * flag set at which time all deferred data will be written along
+     * with the output from the non-deferred action's data.
+     * <p>
+     * Care should be take when deferring output to ensure that at some
+     * point one final non-deferred action is queued or triggered so that
+     * tests do not hang waiting for data that has been forever deferred.
+     *
+     * @return this deferrable scripted action for chaining.
+     */
+    DeferrableScriptedAction deferred();
+
+    /**
+     * @return true if this action's outputs are deferred.
+     */
+    boolean isDeffered();
+
+}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index d798fb4f..c940d42c 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -125,6 +125,42 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
         // and so we direct the resulting action into the event loop to avoid codec or
         // other driver resources being used on two different threads.
 
+        @Override
+        public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+            EventLoop loop = client.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferAMQPFrame(channel, performative, payload, splitWrite);
+            } else {
+                loop.execute(() -> {
+                    super.deferAMQPFrame(channel, performative, payload, splitWrite);
+                });
+            }
+        }
+
+        @Override
+        public void deferSaslFrame(int channel, DescribedType performative) {
+            EventLoop loop = client.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferSaslFrame(channel, performative);
+            } else {
+                loop.execute(() -> {
+                    super.deferSaslFrame(channel, performative);
+                });
+            }
+        }
+
+        @Override
+        public void deferHeader(AMQPHeader header) {
+            EventLoop loop = client.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferHeader(header);
+            } else {
+                loop.execute(() -> {
+                    super.deferHeader(header);
+                });
+            }
+        }
+
         @Override
         public void sendAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
             EventLoop loop = client.eventLoop();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index f3595b93..06b6eb40 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -198,6 +198,42 @@ public class ProtonTestServer extends ProtonTestPeer {
         // and so we direct the resulting action into the event loop to avoid codec or
         // other driver resources being used on two different threads.
 
+        @Override
+        public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+            EventLoop loop = server.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferAMQPFrame(channel, performative, payload, splitWrite);
+            } else {
+                loop.execute(() -> {
+                    super.deferAMQPFrame(channel, performative, payload, splitWrite);
+                });
+            }
+        }
+
+        @Override
+        public void deferSaslFrame(int channel, DescribedType performative) {
+            EventLoop loop = server.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferSaslFrame(channel, performative);
+            } else {
+                loop.execute(() -> {
+                    super.deferSaslFrame(channel, performative);
+                });
+            }
+        }
+
+        @Override
+        public void deferHeader(AMQPHeader header) {
+            EventLoop loop = server.eventLoop();
+            if (loop.inEventLoop()) {
+                super.deferHeader(header);
+            } else {
+                loop.execute(() -> {
+                    super.deferHeader(header);
+                });
+            }
+        }
+
         @Override
         public void sendAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
             EventLoop loop = server.eventLoop();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
index bc9c9309..fcb052f5 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
@@ -17,17 +17,18 @@
 package org.apache.qpid.protonj2.test.driver.actions;
 
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
-import org.apache.qpid.protonj2.test.driver.ScriptedAction;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
 
 /**
  * AMQP Header injection action which can be added to a driver for write at a specific time or
  * following on from some other action in the test script.
  */
-public class AMQPHeaderInjectAction implements ScriptedAction {
+public class AMQPHeaderInjectAction implements DeferrableScriptedAction {
 
     private final AMQPTestDriver driver;
     private final AMQPHeader header;
+    private boolean deferred = false;
 
     public AMQPHeaderInjectAction(AMQPTestDriver driver, AMQPHeader header) {
         this.header = header;
@@ -36,7 +37,11 @@ public class AMQPHeaderInjectAction implements ScriptedAction {
 
     @Override
     public AMQPHeaderInjectAction perform(AMQPTestDriver driver) {
-        driver.sendHeader(header);
+        if (deferred) {
+            driver.deferHeader(header);
+        } else {
+            driver.sendHeader(header);
+        }
         return this;
     }
 
@@ -57,4 +62,15 @@ public class AMQPHeaderInjectAction implements ScriptedAction {
         driver.addScriptedElement(this);
         return this;
     }
+
+    @Override
+    public AMQPHeaderInjectAction deferred() {
+        deferred = true;
+        return this;
+    }
+
+    @Override
+    public boolean isDeffered() {
+        return deferred;
+    }
 }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
index ce763909..25c76487 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.protonj2.test.driver.actions;
 
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
 import org.apache.qpid.protonj2.test.driver.ScriptedAction;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort;
@@ -28,7 +29,7 @@ import io.netty5.buffer.Buffer;
  *
  * @param <P> the AMQP performative being sent.
  */
-public abstract class AbstractPerformativeInjectAction<P extends DescribedType> implements ScriptedAction {
+public abstract class AbstractPerformativeInjectAction<P extends DescribedType> implements DeferrableScriptedAction {
 
     public static final int CHANNEL_UNSET = -1;
 
@@ -37,6 +38,7 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
     private int channel = CHANNEL_UNSET;
     private int delay = -1;
     private boolean splitWrite = false;
+    private boolean deferred = false;
 
     public AbstractPerformativeInjectAction(AMQPTestDriver driver) {
         this.driver = driver;
@@ -46,7 +48,11 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
     public final AbstractPerformativeInjectAction<P> now() {
         // Give actors a chance to prepare.
         beforeActionPerformed(driver);
-        driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload(), splitWrite);
+        if (deferred) {
+            driver.deferAMQPFrame(channel, getPerformative(), getPayload(), splitWrite);
+        } else {
+            driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload(), splitWrite);
+        }
         return this;
     }
 
@@ -62,6 +68,17 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
         return this;
     }
 
+    @Override
+    public AbstractPerformativeInjectAction<P> deferred() {
+        deferred = true;
+        return this;
+    }
+
+    @Override
+    public boolean isDeffered() {
+        return deferred;
+    }
+
     @Override
     public final AbstractPerformativeInjectAction<P> perform(AMQPTestDriver driver) {
         if (afterDelay() > 0) {
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
index 0bb9dc69..4c9059ee 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
@@ -17,7 +17,7 @@
 package org.apache.qpid.protonj2.test.driver.actions;
 
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
-import org.apache.qpid.protonj2.test.driver.ScriptedAction;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
 
 /**
@@ -25,12 +25,12 @@ import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
  *
  * @param <P> the SASL performative being sent.
  */
-public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedType> implements ScriptedAction {
+public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedType> implements DeferrableScriptedAction {
 
     public static final int CHANNEL_UNSET = -1;
 
     private final AMQPTestDriver driver;
-
+    private boolean deferred = false;
     private int channel = CHANNEL_UNSET;
 
     public AbstractSaslPerformativeInjectAction(AMQPTestDriver driver) {
@@ -55,9 +55,24 @@ public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedTy
         return this;
     }
 
+    @Override
+    public AbstractSaslPerformativeInjectAction<P> deferred() {
+        deferred = true;
+        return this;
+    }
+
+    @Override
+    public boolean isDeffered() {
+        return deferred;
+    }
+
     @Override
     public AbstractSaslPerformativeInjectAction<P> perform(AMQPTestDriver driver) {
-        driver.sendSaslFrame(onChannel(), getPerformative());
+        if (deferred) {
+            driver.deferSaslFrame(onChannel(), getPerformative());
+        } else {
+            driver.sendSaslFrame(onChannel(), getPerformative());
+        }
         return this;
     }
 
diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
index 59c55971..5f59a6f8 100644
--- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
+++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
@@ -388,4 +388,46 @@ class SenderHandlingTest extends TestPeerTestsBase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testCreateClientConnectionWithPipelinedOpen() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+
+            // These should not be sent until a non-deferred action is triggered.
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).deferred().now();
+            client.remoteOpen().deferred().now();
+            client.remoteBegin().deferred().now();
+            client.remoteAttach().ofSender().deferred().now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().onChannel(0).respond();
+            peer.expectAttach().ofSender().withHandle(0).onChannel(0).respond();
+            peer.expectEnd().onChannel(0).respond();
+
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin().onChannel(0);
+            client.expectAttach().ofReceiver().onChannel(0).withHandle(0);
+            client.expectEnd().onChannel(0);
+            client.remoteEnd().now(); // Trigger all deferred writes
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            client.close();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org