You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/02/15 16:30:52 UTC
[qpid-protonj2] branch main updated: PROTON-2682 Allow deferred writes to be scripted in tests
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 9233deb9 PROTON-2682 Allow deferred writes to be scripted in tests
9233deb9 is described below
commit 9233deb954624bb2abe8ccdda34e6d94e84fac38
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Feb 15 11:30:42 2023 -0500
PROTON-2682 Allow deferred writes to be scripted in tests
Allows writes to be deferred in order to bunch frame writes into the
same IO write operation.
---
.../qpid/protonj2/test/driver/AMQPTestDriver.java | 136 +++++++++++++++++++--
.../test/driver/DeferrableScriptedAction.java | 45 +++++++
.../protonj2/test/driver/ProtonTestClient.java | 36 ++++++
.../protonj2/test/driver/ProtonTestServer.java | 36 ++++++
.../driver/actions/AMQPHeaderInjectAction.java | 22 +++-
.../actions/AbstractPerformativeInjectAction.java | 21 +++-
.../AbstractSaslPerformativeInjectAction.java | 23 +++-
.../protonj2/test/driver/SenderHandlingTest.java | 42 +++++++
8 files changed, 339 insertions(+), 22 deletions(-)
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index d7476f4b..f3bf0e2b 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import io.netty5.buffer.Buffer;
import io.netty5.buffer.BufferAllocator;
+import io.netty5.buffer.CompositeBuffer;
import io.netty5.channel.EventLoop;
/**
@@ -60,6 +61,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
private final Consumer<AssertionError> assertionConsumer;
private final Supplier<EventLoop> schedulerSupplier;
+ private volatile CompositeBuffer deferredWrites;
private volatile AssertionError failureCause;
private int advertisedIdleTimeout = 0;
@@ -505,6 +507,81 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
sendAMQPFrame(channel, performative, payload, false);
}
+ /**
+ * Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
+ * send if initiated.
+ *
+ * @param channel
+ * The channel to use when writing the frame
+ * @param performative
+ * The AMQP Performative to write
+ * @param payload
+ * The payload to include in the encoded frame.
+ * @param splitWrite
+ * Should the data be written in multiple chunks
+ */
+ public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+ LOG.trace("{} Deferring write of performative: {}", driverName, performative);
+ try (Buffer buffer = frameEncoder.handleWrite(performative, channel, payload, null)) {
+ if (deferredWrites == null) {
+ deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+ }
+
+ deferredWrites.extendWith(buffer.send());
+ } catch (Throwable t) {
+ signalFailure(new AssertionError("Frame was not written due to error.", t));
+ }
+ }
+
+ /**
+ * Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
+ * send if initiated.
+ *
+ * @param channel
+ * The channel to use when writing the frame
+ * @param performative
+ * The SASL Performative to write
+ */
+ public void deferSaslFrame(int channel, DescribedType performative) {
+ // When the outcome of SASL is written the decoder should revert to initial state
+ // as the only valid next incoming value is an AMQP header.
+ if (performative instanceof SaslOutcome) {
+ frameParser.resetToExpectingHeader();
+ }
+
+ LOG.trace("{} Deferring SASL performative write: {}", driverName, performative);
+
+ try (Buffer buffer = frameEncoder.handleWrite(performative, channel)) {
+ if (deferredWrites == null) {
+ deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+ }
+
+ deferredWrites.extendWith(buffer.send());
+ } catch (Throwable t) {
+ signalFailure(new AssertionError("Frame was not written due to error.", t));
+ }
+ }
+
+ /**
+ * Encodes the given Header data into a ProtonBuffer but does not write the data until the next non-deferred
+ * send if initiated.
+ *
+ * @param header
+ * The byte array to send as the AMQP Header.
+ */
+ public void deferHeader(AMQPHeader header) {
+ LOG.trace("{} Deferring AMQP Header write: {}", driverName, header);
+ try {
+ if (deferredWrites == null) {
+ deferredWrites = BufferAllocator.onHeapUnpooled().compose();
+ }
+
+ deferredWrites.extendWith(BufferAllocator.onHeapUnpooled().copyOf(header.getBuffer()).send());
+ } catch (Throwable t) {
+ signalFailure(new AssertionError("Frame was not consumed due to error.", t));
+ }
+ }
+
/**
* Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
*
@@ -530,22 +607,32 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
}
try (Buffer buffer = frameEncoder.handleWrite(performative, channel, payload, null)) {
- LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, buffer, frameConsumer);
+ final Buffer buffered;
+ if (deferredWrites != null) {
+ deferredWrites.extendWith(buffer.send());
+ buffered = deferredWrites;
+ deferredWrites = null;
+ LOG.trace("{} appending deferred buffer {} to next write.", driverName, buffered);
+ } else {
+ buffered = buffer;
+ }
+
+ LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, buffered, frameConsumer);
if (splitWrite) {
- final int bufferSplitPoint = buffer.readableBytes() / 2;
+ final int bufferSplitPoint = buffered.readableBytes() / 2;
- final ByteBuffer front = ByteBuffer.allocate(bufferSplitPoint - buffer.readerOffset());
- final ByteBuffer rear = ByteBuffer.allocate(buffer.readableBytes() - bufferSplitPoint);
+ final ByteBuffer front = ByteBuffer.allocate(bufferSplitPoint - buffered.readerOffset());
+ final ByteBuffer rear = ByteBuffer.allocate(buffered.readableBytes() - bufferSplitPoint);
- buffer.readBytes(front);
- buffer.readBytes(rear);
+ buffered.readBytes(front);
+ buffered.readBytes(rear);
frameConsumer.accept(front.flip());
frameConsumer.accept(rear.flip());
} else {
- final ByteBuffer output = ByteBuffer.allocate(buffer.readableBytes());
- buffer.readBytes(output);
+ final ByteBuffer output = ByteBuffer.allocate(buffered.readableBytes());
+ buffered.readBytes(output);
frameConsumer.accept(output.flip());
}
@@ -569,11 +656,21 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
frameParser.resetToExpectingHeader();
}
- LOG.trace("{} Sending sasl performative: {}", driverName, performative);
-
try (Buffer buffer = frameEncoder.handleWrite(performative, channel)) {
- final ByteBuffer output = ByteBuffer.allocate(buffer.readableBytes());
- buffer.readBytes(output);
+ final Buffer buffered;
+ if (deferredWrites != null) {
+ deferredWrites.extendWith(buffer.send());
+ buffered = deferredWrites;
+ deferredWrites = null;
+ LOG.trace("{} appending deferred buffer {} to next write.", driverName, buffered);
+ } else {
+ buffered = buffer;
+ }
+
+ LOG.trace("{} Sending SASL performative: {}", driverName, performative);
+
+ final ByteBuffer output = ByteBuffer.allocate(buffered.readableBytes());
+ buffered.readBytes(output);
frameConsumer.accept(output.flip());
} catch (Throwable t) {
@@ -588,8 +685,21 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
* The byte array to send as the AMQP Header.
*/
public void sendHeader(AMQPHeader header) {
- LOG.trace("{} Sending AMQP Header: {}", driverName, header);
try {
+ final ByteBuffer output;
+
+ if (deferredWrites != null) {
+ LOG.trace("{} appending deferred buffer {} to next write.", driverName, deferredWrites);
+ deferredWrites.extendWith(BufferAllocator.onHeapUnpooled().copyOf(header.getBuffer()).send());
+ output = ByteBuffer.allocate(deferredWrites.readableBytes());
+ deferredWrites.readBytes(output);
+ output.flip();
+ deferredWrites = null;
+ } else {
+ output = ByteBuffer.wrap(header.getBuffer());
+ }
+
+ LOG.trace("{} Sending AMQP Header: {}", driverName, header);
frameConsumer.accept(ByteBuffer.wrap(header.getBuffer()));
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not consumed due to error.", t));
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java
new file mode 100644
index 00000000..a42253b4
--- /dev/null
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DeferrableScriptedAction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver;
+
+/**
+ * A Type of scripted action whose output can be deferred until the next
+ * {@link ScriptedAction} that writes data which is not deferred.
+ */
+public interface DeferrableScriptedAction extends ScriptedAction {
+
+ /**
+ * Configures this action such that any output from the action is
+ * deferred until the next action that executes without the defer
+ * flag set at which time all deferred data will be written along
+ * with the output from the non-deferred action's data.
+ * <p>
+ * Care should be take when deferring output to ensure that at some
+ * point one final non-deferred action is queued or triggered so that
+ * tests do not hang waiting for data that has been forever deferred.
+ *
+ * @return this deferrable scripted action for chaining.
+ */
+ DeferrableScriptedAction deferred();
+
+ /**
+ * @return true if this action's outputs are deferred.
+ */
+ boolean isDeffered();
+
+}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index d798fb4f..c940d42c 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -125,6 +125,42 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
// and so we direct the resulting action into the event loop to avoid codec or
// other driver resources being used on two different threads.
+ @Override
+ public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+ EventLoop loop = client.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferAMQPFrame(channel, performative, payload, splitWrite);
+ } else {
+ loop.execute(() -> {
+ super.deferAMQPFrame(channel, performative, payload, splitWrite);
+ });
+ }
+ }
+
+ @Override
+ public void deferSaslFrame(int channel, DescribedType performative) {
+ EventLoop loop = client.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferSaslFrame(channel, performative);
+ } else {
+ loop.execute(() -> {
+ super.deferSaslFrame(channel, performative);
+ });
+ }
+ }
+
+ @Override
+ public void deferHeader(AMQPHeader header) {
+ EventLoop loop = client.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferHeader(header);
+ } else {
+ loop.execute(() -> {
+ super.deferHeader(header);
+ });
+ }
+ }
+
@Override
public void sendAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
EventLoop loop = client.eventLoop();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index f3595b93..06b6eb40 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -198,6 +198,42 @@ public class ProtonTestServer extends ProtonTestPeer {
// and so we direct the resulting action into the event loop to avoid codec or
// other driver resources being used on two different threads.
+ @Override
+ public void deferAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
+ EventLoop loop = server.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferAMQPFrame(channel, performative, payload, splitWrite);
+ } else {
+ loop.execute(() -> {
+ super.deferAMQPFrame(channel, performative, payload, splitWrite);
+ });
+ }
+ }
+
+ @Override
+ public void deferSaslFrame(int channel, DescribedType performative) {
+ EventLoop loop = server.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferSaslFrame(channel, performative);
+ } else {
+ loop.execute(() -> {
+ super.deferSaslFrame(channel, performative);
+ });
+ }
+ }
+
+ @Override
+ public void deferHeader(AMQPHeader header) {
+ EventLoop loop = server.eventLoop();
+ if (loop.inEventLoop()) {
+ super.deferHeader(header);
+ } else {
+ loop.execute(() -> {
+ super.deferHeader(header);
+ });
+ }
+ }
+
@Override
public void sendAMQPFrame(int channel, DescribedType performative, Buffer payload, boolean splitWrite) {
EventLoop loop = server.eventLoop();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
index bc9c9309..fcb052f5 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AMQPHeaderInjectAction.java
@@ -17,17 +17,18 @@
package org.apache.qpid.protonj2.test.driver.actions;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
-import org.apache.qpid.protonj2.test.driver.ScriptedAction;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
/**
* AMQP Header injection action which can be added to a driver for write at a specific time or
* following on from some other action in the test script.
*/
-public class AMQPHeaderInjectAction implements ScriptedAction {
+public class AMQPHeaderInjectAction implements DeferrableScriptedAction {
private final AMQPTestDriver driver;
private final AMQPHeader header;
+ private boolean deferred = false;
public AMQPHeaderInjectAction(AMQPTestDriver driver, AMQPHeader header) {
this.header = header;
@@ -36,7 +37,11 @@ public class AMQPHeaderInjectAction implements ScriptedAction {
@Override
public AMQPHeaderInjectAction perform(AMQPTestDriver driver) {
- driver.sendHeader(header);
+ if (deferred) {
+ driver.deferHeader(header);
+ } else {
+ driver.sendHeader(header);
+ }
return this;
}
@@ -57,4 +62,15 @@ public class AMQPHeaderInjectAction implements ScriptedAction {
driver.addScriptedElement(this);
return this;
}
+
+ @Override
+ public AMQPHeaderInjectAction deferred() {
+ deferred = true;
+ return this;
+ }
+
+ @Override
+ public boolean isDeffered() {
+ return deferred;
+ }
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
index ce763909..25c76487 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractPerformativeInjectAction.java
@@ -17,6 +17,7 @@
package org.apache.qpid.protonj2.test.driver.actions;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
import org.apache.qpid.protonj2.test.driver.ScriptedAction;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort;
@@ -28,7 +29,7 @@ import io.netty5.buffer.Buffer;
*
* @param <P> the AMQP performative being sent.
*/
-public abstract class AbstractPerformativeInjectAction<P extends DescribedType> implements ScriptedAction {
+public abstract class AbstractPerformativeInjectAction<P extends DescribedType> implements DeferrableScriptedAction {
public static final int CHANNEL_UNSET = -1;
@@ -37,6 +38,7 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
private int channel = CHANNEL_UNSET;
private int delay = -1;
private boolean splitWrite = false;
+ private boolean deferred = false;
public AbstractPerformativeInjectAction(AMQPTestDriver driver) {
this.driver = driver;
@@ -46,7 +48,11 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
public final AbstractPerformativeInjectAction<P> now() {
// Give actors a chance to prepare.
beforeActionPerformed(driver);
- driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload(), splitWrite);
+ if (deferred) {
+ driver.deferAMQPFrame(channel, getPerformative(), getPayload(), splitWrite);
+ } else {
+ driver.sendAMQPFrame(onChannel(), getPerformative(), getPayload(), splitWrite);
+ }
return this;
}
@@ -62,6 +68,17 @@ public abstract class AbstractPerformativeInjectAction<P extends DescribedType>
return this;
}
+ @Override
+ public AbstractPerformativeInjectAction<P> deferred() {
+ deferred = true;
+ return this;
+ }
+
+ @Override
+ public boolean isDeffered() {
+ return deferred;
+ }
+
@Override
public final AbstractPerformativeInjectAction<P> perform(AMQPTestDriver driver) {
if (afterDelay() > 0) {
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
index 0bb9dc69..4c9059ee 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/AbstractSaslPerformativeInjectAction.java
@@ -17,7 +17,7 @@
package org.apache.qpid.protonj2.test.driver.actions;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
-import org.apache.qpid.protonj2.test.driver.ScriptedAction;
+import org.apache.qpid.protonj2.test.driver.DeferrableScriptedAction;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
/**
@@ -25,12 +25,12 @@ import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
*
* @param <P> the SASL performative being sent.
*/
-public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedType> implements ScriptedAction {
+public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedType> implements DeferrableScriptedAction {
public static final int CHANNEL_UNSET = -1;
private final AMQPTestDriver driver;
-
+ private boolean deferred = false;
private int channel = CHANNEL_UNSET;
public AbstractSaslPerformativeInjectAction(AMQPTestDriver driver) {
@@ -55,9 +55,24 @@ public abstract class AbstractSaslPerformativeInjectAction<P extends DescribedTy
return this;
}
+ @Override
+ public AbstractSaslPerformativeInjectAction<P> deferred() {
+ deferred = true;
+ return this;
+ }
+
+ @Override
+ public boolean isDeffered() {
+ return deferred;
+ }
+
@Override
public AbstractSaslPerformativeInjectAction<P> perform(AMQPTestDriver driver) {
- driver.sendSaslFrame(onChannel(), getPerformative());
+ if (deferred) {
+ driver.deferSaslFrame(onChannel(), getPerformative());
+ } else {
+ driver.sendSaslFrame(onChannel(), getPerformative());
+ }
return this;
}
diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
index 59c55971..5f59a6f8 100644
--- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
+++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
@@ -388,4 +388,46 @@ class SenderHandlingTest extends TestPeerTestsBase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testCreateClientConnectionWithPipelinedOpen() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+
+ // These should not be sent until a non-deferred action is triggered.
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).deferred().now();
+ client.remoteOpen().deferred().now();
+ client.remoteBegin().deferred().now();
+ client.remoteAttach().ofSender().deferred().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().onChannel(0).respond();
+ peer.expectAttach().ofSender().withHandle(0).onChannel(0).respond();
+ peer.expectEnd().onChannel(0).respond();
+
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin().onChannel(0);
+ client.expectAttach().ofReceiver().onChannel(0).withHandle(0);
+ client.expectEnd().onChannel(0);
+ client.remoteEnd().now(); // Trigger all deferred writes
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org