You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/06/28 19:10:23 UTC

qpid-jms git commit: QPIDJMS-399 Split write and flush of data into two operations

Repository: qpid-jms
Updated Branches:
  refs/heads/master 856e2b9ac -> 7750a1c27


QPIDJMS-399 Split write and flush of data into two operations

Allow for split write and then flush of outbound data which allows
for quicker release of blocking operations where the flush can be
done after the caller has been notified of success or a batch of
writes could improve performance.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7750a1c2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7750a1c2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7750a1c2

Branch: refs/heads/master
Commit: 7750a1c27589261b197d7b746506e19d8771b145
Parents: 856e2b9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 28 15:09:45 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 28 15:09:45 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    |  4 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 41 ++++++++++++++++++--
 .../apache/qpid/jms/transports/Transport.java   | 25 ++++++++++--
 .../jms/transports/netty/NettyTcpTransport.java | 20 ++++++++--
 .../jms/transports/netty/NettyWsTransport.java  | 15 ++++++-
 .../transports/netty/NettyTcpTransportTest.java | 12 +++---
 .../transports/netty/NettyWsTransportTest.java  | 10 ++---
 7 files changed, 104 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index b2036fe..0f50b5c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -166,7 +166,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         // Put it on the wire and let it fail if the connection is broken, if it does
         // get written then continue on to determine when we should complete it.
-        if (provider.pumpToProtonTransport(request)) {
+        if (provider.pumpToProtonTransport(request, false)) {
             // For presettled messages we can just mark as successful and we are done, but
             // for any other message we still track it until the remote settles.  If the send
             // was tagged as asynchronous we must mark the original request as complete but
@@ -177,6 +177,8 @@ public class AmqpFixedProducer extends AmqpProducer {
             } else if (envelope.isSendAsync()) {
                 send.getOriginalRequest().onSuccess();
             }
+
+            provider.getTransport().flush();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 7153bbe..890ebdb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -116,6 +116,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
     private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
+    private static final int DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH = 128 * 1024;
+
     private volatile ProviderListener listener;
     private volatile AmqpConnection connection;
     private AmqpSaslAuthenticator authenticator;
@@ -131,6 +133,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private int drainTimeout = 60000;
     private long sessionOutoingWindow = -1; // Use proton default
     private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+    private int maxWriteBytesBeforeFlush = DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH;
 
     private boolean allowNonSecureRedirects;
 
@@ -695,8 +698,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         request.onSuccess();
                         pumpToProtonTransport(request);
                     } else {
-                        pumpToProtonTransport(request);
+                        pumpToProtonTransport(request, false);
                         request.onSuccess();
+                        transport.flush();
                     }
                 } catch (Throwable t) {
                     request.onFailure(t);
@@ -1037,12 +1041,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     }
 
     protected boolean pumpToProtonTransport() {
-        return pumpToProtonTransport(NOOP_REQUEST);
+        return pumpToProtonTransport(NOOP_REQUEST, true);
     }
 
     protected boolean pumpToProtonTransport(AsyncResult request) {
+        return pumpToProtonTransport(request, true);
+    }
+
+    protected boolean pumpToProtonTransport(AsyncResult request, boolean flush) {
         try {
             boolean done = false;
+            int bytesWritten = 0;
+
             while (!done) {
                 ByteBuffer toWrite = protonTransport.getOutputBuffer();
                 if (toWrite != null && toWrite.hasRemaining()) {
@@ -1053,12 +1063,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         TRACE_BYTES.info("Sending: {}", ByteBufUtil.hexDump(outbound));
                     }
 
-                    transport.send(outbound);
+                    bytesWritten += outbound.readableBytes();
+                    if (flush && bytesWritten >= getMaxWriteBytesBeforeFlush()) {
+                        transport.flush();
+                        bytesWritten = 0;
+                    }
+
+                    transport.write(outbound);
                     protonTransport.outputConsumed();
                 } else {
                     done = true;
                 }
             }
+
+            if (flush && bytesWritten > 0) {
+                transport.flush();
+            }
         } catch (IOException e) {
             fireProviderException(e);
             request.onFailure(e);
@@ -1254,6 +1274,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         return maxFrameSize;
     }
 
+    public int getMaxWriteBytesBeforeFlush() {
+        return maxWriteBytesBeforeFlush;
+    }
+
+    /**
+     * Sets the maximum number of bytes that will be written on a large set of batched writes
+     * before a flush is requested on the {@link Transport}.
+     *
+     * @param maxWriteBytesBeforeFlush
+     * 		number of bytes written before a flush is requested.
+     */
+    public void setMaxWriteBytesBeforeFlush(int maxWriteBytesBeforeFlush) {
+        this.maxWriteBytesBeforeFlush = maxWriteBytesBeforeFlush;
+    }
+
     /**
      * Sets the max frame size (in bytes).
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
index b6ad697..5c92b06 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -72,14 +72,33 @@ public interface Transport {
     ByteBuf allocateSendBuffer(int size) throws IOException;
 
     /**
-     * Sends a chunk of data over the Transport connection.
+     * Writes a chunk of data over the Transport connection without performing an
+     * explicit flush on the transport.
      *
      * @param output
      *        The buffer of data that is to be transmitted.
      *
-     * @throws IOException if an error occurs during the send operation.
+     * @throws IOException if an error occurs during the write operation.
      */
-    void send(ByteBuf output) throws IOException;
+    void write(ByteBuf output) throws IOException;
+
+    /**
+     * Writes a chunk of data over the Transport connection and requests a flush of
+     * all pending queued write operations
+     *
+     * @param output
+     *        The buffer of data that is to be transmitted.
+     *
+     * @throws IOException if an error occurs during the write operation.
+     */
+    void writeAndFlush(ByteBuf output) throws IOException;
+
+    /**
+     * Request a flush of all pending writes to the underlying connection.
+     *
+     * @throws IOException if an error occurs during the flush operation.
+     */
+    void flush() throws IOException;
 
     /**
      * Gets the currently set TransportListener instance

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index a5b7ef7..ab04a75 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -262,15 +262,27 @@ public class NettyTcpTransport implements Transport {
     }
 
     @Override
-    public void send(ByteBuf output) throws IOException {
+    public void write(ByteBuf output) throws IOException {
         checkConnected(output);
-
         LOG.trace("Attempted write of: {} bytes", output.readableBytes());
+        channel.write(output);
+    }
 
+    @Override
+    public void writeAndFlush(ByteBuf output) throws IOException {
+        checkConnected(output);
+        LOG.trace("Attempted write and flush of: {} bytes", output.readableBytes());
         channel.writeAndFlush(output);
     }
 
     @Override
+    public void flush() throws IOException {
+        checkConnected();
+        LOG.trace("Attempted flush of pending writes");
+        channel.flush();
+    }
+
+    @Override
     public TransportListener getTransportListener() {
         return listener;
     }
@@ -377,13 +389,13 @@ public class NettyTcpTransport implements Transport {
     //----- State change handlers and checks ---------------------------------//
 
     protected final void checkConnected() throws IOException {
-        if (!connected.get()) {
+        if (!connected.get() || !channel.isActive()) {
             throw new IOException("Cannot send to a non-connected transport.");
         }
     }
 
     private void checkConnected(ByteBuf output) throws IOException {
-        if (!connected.get()) {
+        if (!connected.get() || !channel.isActive()) {
             ReferenceCountUtil.release(output);
             throw new IOException("Cannot send to a non-connected transport.");
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
index 93370a3..034dfc3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
@@ -85,7 +85,7 @@ public class NettyWsTransport extends NettyTcpTransport {
     }
 
     @Override
-    public void send(ByteBuf output) throws IOException {
+    public void write(ByteBuf output) throws IOException {
         checkConnected();
         int length = output.readableBytes();
         if (length == 0) {
@@ -94,6 +94,19 @@ public class NettyWsTransport extends NettyTcpTransport {
 
         LOG.trace("Attempted write of: {} bytes", length);
 
+        channel.write(new BinaryWebSocketFrame(output));
+    }
+
+    @Override
+    public void writeAndFlush(ByteBuf output) throws IOException {
+        checkConnected();
+        int length = output.readableBytes();
+        if (length == 0) {
+            return;
+        }
+
+        LOG.trace("Attempted write and flush of: {} bytes", length);
+
         channel.writeAndFlush(new BinaryWebSocketFrame(output));
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index 777ffbb..f463971 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -253,7 +253,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 Transport transport = createTransport(serverLocation, testListener, createClientOptions());
                 try {
                     transport.connect(null);
-                    transport.send(sendBuffer.copy());
+                    transport.writeAndFlush(sendBuffer.copy());
                     transports.add(transport);
                 } catch (Exception e) {
                     fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -334,7 +334,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             assertTrue(transport.isConnected());
 
-            transport.send(Unpooled.buffer(0));
+            transport.writeAndFlush(Unpooled.buffer(0));
 
             transport.close();
         }
@@ -367,7 +367,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 sendBuffer.writeByte('A');
             }
 
-            transport.send(sendBuffer);
+            transport.writeAndFlush(sendBuffer);
 
             assertTrue(Wait.waitFor(new Wait.Condition() {
                 @Override
@@ -419,7 +419,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             }
 
             for (int i = 0; i < iterations; ++i) {
-                transport.send(sendBuffer.copy());
+                transport.writeAndFlush(sendBuffer.copy());
             }
 
             assertTrue(Wait.waitFor(new Wait.Condition() {
@@ -460,7 +460,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             ByteBuf sendBuffer = Unpooled.buffer(10);
             try {
-                transport.send(sendBuffer);
+                transport.writeAndFlush(sendBuffer);
                 fail("Should throw on send of closed transport");
             } catch (IOException ex) {
             }
@@ -497,7 +497,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
                 transport.close();
 
                 try {
-                    transport.send(sendBuffer);
+                    transport.writeAndFlush(sendBuffer);
                     fail("Should throw on send of closed transport");
                 } catch (IOException ex) {
                 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index 993d526..390fd73 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -151,7 +151,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 transport.setMaxFrameSize(FRAME_SIZE);
                 transport.connect(null);
                 transports.add(transport);
-                transport.send(sendBuffer.copy());
+                transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
             }
@@ -202,7 +202,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 transport.setMaxFrameSize(FRAME_SIZE);
                 transport.connect(null);
                 transports.add(transport);
-                transport.send(sendBuffer.copy());
+                transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
             }
@@ -263,7 +263,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 transport.setMaxFrameSize(FRAME_SIZE / 2);
                 transport.connect(null);
                 transports.add(transport);
-                transport.send(sendBuffer.copy());
+                transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
             }
@@ -299,7 +299,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 transport.setMaxFrameSize(FRAME_SIZE);
                 transport.connect(null);
                 transports.add(transport);
-                transport.send(sendBuffer.copy());
+                transport.writeAndFlush(sendBuffer.copy());
             } catch (Exception e) {
                 fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
             }
@@ -308,7 +308,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
                 @Override
                 public boolean isSatisfied() throws Exception {
                     try {
-                        transport.send(sendBuffer);
+                        transport.writeAndFlush(sendBuffer);
                     } catch (IOException e) {
                         LOG.info("Transport send caught error:", e);
                         return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org