You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/20 17:13:16 UTC
qpid-broker-j git commit: QPID-8042: [System Tests] Improve
pipe-lining of frames in protocol tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 9daed1ed1 -> 2cbb62913
QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2cbb6291
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2cbb6291
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2cbb6291
Branch: refs/heads/master
Commit: 2cbb629136f9c53a854a8268d3061e8501eb9611
Parents: 9daed1e
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 16:55:39 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Nov 20 17:09:37 2017 +0000
----------------------------------------------------------------------
.../qpid/tests/protocol/v0_8/Interaction.java | 4 +-
.../qpid/tests/protocol/v1_0/Interaction.java | 6 +-
.../soleconn/CloseExistingPolicy.java | 6 +-
.../v1_0/transport/security/sasl/SaslTest.java | 2 +-
.../qpid/tests/protocol/FrameTransport.java | 26 +++------
.../apache/qpid/tests/protocol/Interaction.java | 8 +--
.../qpid/tests/protocol/OutputHandler.java | 59 +++++++++++++++++++-
7 files changed, 78 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 0b62770..29e058c 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -58,13 +58,13 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception
{
final AMQFrame frameBody = new AMQFrame(channel, amqBody);
- sendPerformativeAndChainFuture(frameBody, false);
+ sendPerformativeAndChainFuture(frameBody);
return this;
}
public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception
{
- sendPerformativeAndChainFuture(dataBlock, false);
+ sendPerformativeAndChainFuture(dataBlock);
return this;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7d73ce8..4aad6ee 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -153,7 +153,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
Close close = new Close();
sendPerformative(close, UnsignedShort.valueOf((short) 0));
- Response<?> response = getNextResponse();
+ Response<?> response = consumeResponse().getLatestResponse();
if (!(response.getBody() instanceof Close))
{
throw new IllegalStateException(String.format(
@@ -983,7 +983,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
{
SASLFrame transportFrame = new SASLFrame(frameBody);
- sendPerformativeAndChainFuture(transportFrame, true);
+ sendPerformativeAndChainFuture(transportFrame);
}
private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
@@ -1001,7 +1001,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
duplicate = payload.duplicate();
}
transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
- ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+ ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame);
if (frameBody instanceof Transfer)
{
listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 4bee689..5be5482 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -106,7 +106,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
.openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
.openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
CLOSE_EXISTING))
- .open();
+ .open()
+ .sync();
final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
@@ -145,7 +146,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
.openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
.openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
CLOSE_EXISTING))
- .open();
+ .open()
+ .sync();
final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index d7f3bc1..1ca3f20 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -258,7 +258,7 @@ public class SaslTest extends BrokerAdminUsingTestBase
assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
interaction.consumeResponse(SaslMechanisms.class);
- interaction.open();
+ interaction.open().sync();
transport.assertNoMoreResponsesAndChannelClosed();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
index daf500d..28dc02e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -31,14 +31,12 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -138,26 +136,15 @@ public abstract class FrameTransport implements AutoCloseable
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bytes);
_channel.write(buffer, promise);
- _channel.flush();
return JdkFutureAdapters.listenInPoolThread(promise);
}
- public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+ public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
{
Preconditions.checkState(_channel != null, "Not connected");
- if (!sync)
- {
- ChannelPromise promise = _channel.newPromise();
- _channel.write(data, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
- else
- {
- ChannelFuture channelFuture = _channel.writeAndFlush(data);
- channelFuture.sync();
- return Futures.immediateFuture(null);
- }
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ return JdkFutureAdapters.listenInPoolThread(promise);
}
public <T extends Response<?>> T getNextResponse() throws Exception
@@ -177,6 +164,11 @@ public abstract class FrameTransport implements AutoCloseable
assertThat(_channelClosedSeen, is(true));
}
+ public void flush()
+ {
+ _channel.flush();
+ }
+
private static class ChannelClosedResponse implements Response<Void>
{
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 2390227..b6e631d 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -75,6 +75,7 @@ public abstract class Interaction<I extends Interaction>
public I sync() throws InterruptedException, ExecutionException, TimeoutException
{
+ _transport.flush();
if (_latestFuture != null)
{
_latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -85,14 +86,11 @@ public abstract class Interaction<I extends Interaction>
public Response<?> getLatestResponse() throws Exception
{
- sync();
return _latestResponse;
}
public <T> T getLatestResponse(Class<T> type) throws Exception
{
- sync();
-
if (_latestResponse.getBody() == null)
{
throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
@@ -110,9 +108,9 @@ public abstract class Interaction<I extends Interaction>
return (T) _latestResponse.getBody();
}
- protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
if (_latestFuture != null)
{
_latestFuture = allAsList(_latestFuture, future);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 40a2ca7..5d40447 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -21,6 +21,8 @@
package org.apache.qpid.tests.protocol;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -31,10 +33,14 @@ import io.netty.channel.ChannelPromise;
public class OutputHandler extends ChannelOutboundHandlerAdapter
{
private final OutputEncoder _outputEncoder;
+ private Queue<ByteBufferPromisePair> _cachedEncodedFramePromisePairs;
+ private int _encodedSize;
OutputHandler(final OutputEncoder outputEncoder)
{
_outputEncoder = outputEncoder;
+ _cachedEncodedFramePromisePairs = new LinkedList<>();
+ _encodedSize = 0;
}
@Override
@@ -51,12 +57,44 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
}
}
- private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+ private synchronized void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
{
- byte[] data = new byte[dataByteBuffer.remaining()];
- dataByteBuffer.get(data);
+ _cachedEncodedFramePromisePairs.add(new ByteBufferPromisePair(dataByteBuffer, promise));
+ _encodedSize += dataByteBuffer.remaining();
+ }
+
+
+ @Override
+ public synchronized void flush(final ChannelHandlerContext ctx) throws Exception
+ {
+ final ChannelPromise promise = ctx.channel().newPromise();
+ byte[] data = new byte[_encodedSize];
+
+ int offset = 0;
+ while(offset < _encodedSize)
+ {
+ ByteBufferPromisePair currentPair = _cachedEncodedFramePromisePairs.poll();
+ int remaining = currentPair.byteBuffer.remaining();
+ currentPair.byteBuffer.get(data, offset, remaining) ;
+ offset += remaining;
+
+ promise.addListener(future -> {
+ if (future.isSuccess())
+ {
+ currentPair.channelPromise.setSuccess();
+ }
+ else
+ {
+ currentPair.channelPromise.setFailure(future.cause());
+ }
+ });
+ }
+
+ _encodedSize = 0;
+
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(data);
+
try
{
OutputHandler.super.write(ctx, buffer, promise);
@@ -65,5 +103,20 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
{
promise.setFailure(e);
}
+
+ super.flush(ctx);
+ }
+
+ class ByteBufferPromisePair
+ {
+ private ByteBuffer byteBuffer;
+ private ChannelPromise channelPromise;
+
+ ByteBufferPromisePair(final ByteBuffer byteBuffer, final ChannelPromise channelPromise)
+ {
+ this.byteBuffer = byteBuffer;
+ this.channelPromise = channelPromise;
+ }
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org