You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/20 17:13:16 UTC

qpid-broker-j git commit: QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 9daed1ed1 -> 2cbb62913


QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2cbb6291
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2cbb6291
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2cbb6291

Branch: refs/heads/master
Commit: 2cbb629136f9c53a854a8268d3061e8501eb9611
Parents: 9daed1e
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 16:55:39 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Nov 20 17:09:37 2017 +0000

----------------------------------------------------------------------
 .../qpid/tests/protocol/v0_8/Interaction.java   |  4 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |  6 +-
 .../soleconn/CloseExistingPolicy.java           |  6 +-
 .../v1_0/transport/security/sasl/SaslTest.java  |  2 +-
 .../qpid/tests/protocol/FrameTransport.java     | 26 +++------
 .../apache/qpid/tests/protocol/Interaction.java |  8 +--
 .../qpid/tests/protocol/OutputHandler.java      | 59 +++++++++++++++++++-
 7 files changed, 78 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 0b62770..29e058c 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -58,13 +58,13 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
     public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception
     {
         final AMQFrame frameBody = new AMQFrame(channel, amqBody);
-        sendPerformativeAndChainFuture(frameBody, false);
+        sendPerformativeAndChainFuture(frameBody);
         return this;
     }
 
     public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception
     {
-        sendPerformativeAndChainFuture(dataBlock, false);
+        sendPerformativeAndChainFuture(dataBlock);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7d73ce8..4aad6ee 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -153,7 +153,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
         Close close = new Close();
 
         sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        Response<?> response = getNextResponse();
+        Response<?> response = consumeResponse().getLatestResponse();
         if (!(response.getBody() instanceof Close))
         {
             throw new IllegalStateException(String.format(
@@ -983,7 +983,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
     private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
     {
         SASLFrame transportFrame = new SASLFrame(frameBody);
-        sendPerformativeAndChainFuture(transportFrame, true);
+        sendPerformativeAndChainFuture(transportFrame);
     }
 
     private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
@@ -1001,7 +1001,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
                 duplicate = payload.duplicate();
             }
             transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
-            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame);
             if (frameBody instanceof Transfer)
             {
                 listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 4bee689..5be5482 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -106,7 +106,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
                             .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
                             .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
                                                                      CLOSE_EXISTING))
-                            .open();
+                            .open()
+                            .sync();
 
                 final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));
@@ -145,7 +146,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
                             .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
                             .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
                                                                      CLOSE_EXISTING))
-                            .open();
+                            .open()
+                            .sync();
 
                 final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index d7f3bc1..1ca3f20 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -258,7 +258,7 @@ public class SaslTest extends BrokerAdminUsingTestBase
             assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
 
             interaction.consumeResponse(SaslMechanisms.class);
-            interaction.open();
+            interaction.open().sync();
 
             transport.assertNoMoreResponsesAndChannelClosed();
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
index daf500d..28dc02e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -31,14 +31,12 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -138,26 +136,15 @@ public abstract class FrameTransport implements AutoCloseable
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
         buffer.writeBytes(bytes);
         _channel.write(buffer, promise);
-        _channel.flush();
         return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
-    public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+    public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
     {
         Preconditions.checkState(_channel != null, "Not connected");
-        if (!sync)
-        {
-            ChannelPromise promise = _channel.newPromise();
-            _channel.write(data, promise);
-            _channel.flush();
-            return JdkFutureAdapters.listenInPoolThread(promise);
-        }
-        else
-        {
-            ChannelFuture channelFuture = _channel.writeAndFlush(data);
-            channelFuture.sync();
-            return Futures.immediateFuture(null);
-        }
+        ChannelPromise promise = _channel.newPromise();
+        _channel.write(data, promise);
+        return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
     public <T extends Response<?>> T getNextResponse() throws Exception
@@ -177,6 +164,11 @@ public abstract class FrameTransport implements AutoCloseable
         assertThat(_channelClosedSeen, is(true));
     }
 
+    public void flush()
+    {
+        _channel.flush();
+    }
+
     private static class ChannelClosedResponse implements Response<Void>
     {
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 2390227..b6e631d 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -75,6 +75,7 @@ public abstract class Interaction<I extends Interaction>
 
     public I sync() throws InterruptedException, ExecutionException, TimeoutException
     {
+        _transport.flush();
         if (_latestFuture != null)
         {
             _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -85,14 +86,11 @@ public abstract class Interaction<I extends Interaction>
 
     public Response<?> getLatestResponse() throws Exception
     {
-        sync();
         return _latestResponse;
     }
 
     public <T> T getLatestResponse(Class<T> type) throws Exception
     {
-        sync();
-
         if (_latestResponse.getBody() == null)
         {
             throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
@@ -110,9 +108,9 @@ public abstract class Interaction<I extends Interaction>
         return (T) _latestResponse.getBody();
     }
 
-    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
         if (_latestFuture != null)
         {
             _latestFuture = allAsList(_latestFuture, future);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2cbb6291/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 40a2ca7..5d40447 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -21,6 +21,8 @@
 package org.apache.qpid.tests.protocol;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -31,10 +33,14 @@ import io.netty.channel.ChannelPromise;
 public class OutputHandler extends ChannelOutboundHandlerAdapter
 {
     private final OutputEncoder _outputEncoder;
+    private Queue<ByteBufferPromisePair> _cachedEncodedFramePromisePairs;
+    private int _encodedSize;
 
     OutputHandler(final OutputEncoder outputEncoder)
     {
         _outputEncoder = outputEncoder;
+        _cachedEncodedFramePromisePairs = new LinkedList<>();
+        _encodedSize = 0;
     }
 
     @Override
@@ -51,12 +57,44 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
         }
     }
 
-    private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+    private synchronized void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
     {
-        byte[] data = new byte[dataByteBuffer.remaining()];
-        dataByteBuffer.get(data);
+        _cachedEncodedFramePromisePairs.add(new ByteBufferPromisePair(dataByteBuffer, promise));
+        _encodedSize += dataByteBuffer.remaining();
+    }
+
+
+    @Override
+    public synchronized void flush(final ChannelHandlerContext ctx) throws Exception
+    {
+        final ChannelPromise promise = ctx.channel().newPromise();
+        byte[] data  = new byte[_encodedSize];
+
+        int offset = 0;
+        while(offset < _encodedSize)
+        {
+            ByteBufferPromisePair currentPair = _cachedEncodedFramePromisePairs.poll();
+            int remaining = currentPair.byteBuffer.remaining();
+            currentPair.byteBuffer.get(data, offset, remaining) ;
+            offset += remaining;
+
+            promise.addListener(future -> {
+                if (future.isSuccess())
+                {
+                    currentPair.channelPromise.setSuccess();
+                }
+                else
+                {
+                    currentPair.channelPromise.setFailure(future.cause());
+                }
+            });
+        }
+
+        _encodedSize = 0;
+
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
         buffer.writeBytes(data);
+
         try
         {
             OutputHandler.super.write(ctx, buffer, promise);
@@ -65,5 +103,20 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
         {
             promise.setFailure(e);
         }
+
+        super.flush(ctx);
+    }
+
+    class ByteBufferPromisePair
+    {
+        private ByteBuffer byteBuffer;
+        private ChannelPromise channelPromise;
+
+        ByteBufferPromisePair(final ByteBuffer byteBuffer, final ChannelPromise channelPromise)
+        {
+            this.byteBuffer = byteBuffer;
+            this.channelPromise = channelPromise;
+        }
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org