You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/05/02 15:25:49 UTC

qpid-jms git commit: QPIDJMS-290 Pass along the maxFrameSize setting to the WS Transport

Repository: qpid-jms
Updated Branches:
  refs/heads/master 70fe1b882 -> f255f7742


QPIDJMS-290 Pass along the maxFrameSize setting to the WS Transport

The WS transport needs to know what the AMQP max frame size value is
that's been sent and update the limit imposed by the WS frame parser
before we connect in order to allow incoming frames that are the max
size, or those larger than the default 65535 that Netty uses.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f255f774
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f255f774
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f255f774

Branch: refs/heads/master
Commit: f255f7742f2f708c80778f3b6139daae26f89740
Parents: 70fe1b8
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 2 11:23:10 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 2 11:23:10 2017 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |   1 +
 .../apache/qpid/jms/transports/Transport.java   |  17 +++
 .../jms/transports/netty/NettyTcpTransport.java |  18 ++-
 .../jms/transports/netty/NettyWsTransport.java  |   3 +-
 .../qpid/jms/transports/netty/NettyServer.java  |  12 +-
 .../transports/netty/NettyWsTransportTest.java  | 130 ++++++++++++++++++-
 .../apache/qpid/jms/JmsWSConnectionTest.java    | 100 +++++++++++++-
 7 files changed, 272 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3b577a0..463e2fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -191,6 +191,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                     SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
 
                     transport.setTransportListener(AmqpProvider.this);
+                    transport.setMaxFrameSize(maxFrameSize);
                     transport.connect(sslContextOverride);
 
                     if (saslLayer) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
index c0eaf6f..b6ad697 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -114,4 +114,21 @@ public interface Transport {
      */
     Principal getLocalPrincipal();
 
+    /**
+     * Sets the Maximum Frame Size the transport should accept from the remote.  This option
+     * is not applicable to all transport types, those that support validating the incoming
+     * frame size should apply the configured value.
+     *
+     * @param maxFrameSize
+     * 		The maximum frame size to accept from the remote.
+     */
+    void setMaxFrameSize(int maxFrameSize);
+
+    /**
+     * Returns the currently configured maximum frame size setting.
+     *
+     * @return the current max frame size setting for this transport.
+     */
+    int getMaxFrameSize();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 22aaf46..1f17069 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -64,12 +64,14 @@ public class NettyTcpTransport implements Transport {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
 
-    private static final int SHUTDOWN_TIMEOUT = 50;
+    public static final int SHUTDOWN_TIMEOUT = 50;
+    public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
 
     protected Bootstrap bootstrap;
     protected EventLoopGroup group;
     protected Channel channel;
     protected TransportListener listener;
+    protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
 
     private final TransportOptions options;
     private final URI remote;
@@ -290,6 +292,20 @@ public class NettyTcpTransport implements Transport {
         return result;
     }
 
+    @Override
+    public void setMaxFrameSize(int maxFrameSize) {
+        if (connected.get()) {
+            throw new IllegalStateException("Cannot change Max Frame Size while connected.");
+        }
+
+        this.maxFrameSize = maxFrameSize;
+    }
+
+    @Override
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
     //----- Internal implementation details, can be overridden as needed -----//
 
     protected String getRemoteHost() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
index 3eff143..d0871d2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
@@ -115,7 +115,8 @@ public class NettyWsTransport extends NettyTcpTransport {
 
         public NettyWebSocketTransportHandler() {
             handshaker = WebSocketClientHandshakerFactory.newHandshaker(
-                getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true, new DefaultHttpHeaders());
+                getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
+                true, new DefaultHttpHeaders(), getMaxFrameSize());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
index e7a3460..dd383a6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
@@ -85,6 +85,7 @@ public abstract class NettyServer implements AutoCloseable {
     private int serverPort;
     private final boolean needClientAuth;
     private final boolean webSocketServer;
+    private int maxFrameSize = NettyTcpTransport.DEFAULT_MAX_FRAME_SIZE;
     private String webSocketPath = WEBSOCKET_PATH;
     private volatile SslHandler sslHandler;
 
@@ -120,6 +121,14 @@ public abstract class NettyServer implements AutoCloseable {
         this.webSocketPath = webSocketPath;
     }
 
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+
     protected URI getConnectionURI() throws Exception {
         if (!started.get()) {
             throw new IllegalStateException("Cannot get URI of non-started server");
@@ -183,7 +192,7 @@ public abstract class NettyServer implements AutoCloseable {
                     if (webSocketServer) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
-                        ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true));
+                        ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
 
                     ch.pipeline().addLast(new NettyServerOutboundHandler());
@@ -270,7 +279,6 @@ public abstract class NettyServer implements AutoCloseable {
                     public void operationComplete(Future<Channel> future) throws Exception {
                         LOG.info("Server -> SSL handshake completed. Succeeded: {}", future.isSuccess());
                         if (!future.isSuccess()) {
-                            sslHandler.close();
                             ctx.close();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index 5201381..a50728d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -22,7 +22,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
@@ -30,6 +33,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 /**
  * Test the Netty based WebSocket Transport
  */
@@ -51,7 +57,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
         }
     }
 
-    @Test(timeout = 60 * 1000)
+    @Test(timeout = 60000)
     public void testConnectToServerUsingCorrectPath() throws Exception {
         final String WEBSOCKET_PATH = "/testpath";
 
@@ -84,7 +90,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
         assertTrue(data.isEmpty());
     }
 
-    @Test(timeout = 60 * 1000)
+    @Test(timeout = 60000)
     public void testConnectToServerUsingIncorrectPath() throws Exception {
         final String WEBSOCKET_PATH = "/testpath";
 
@@ -114,4 +120,124 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
         assertTrue(exceptions.isEmpty());
         assertTrue(data.isEmpty());
     }
+
+    @Test(timeout = 60000)
+    public void testConnectionsSendReceiveLargeDataWhenFrameSizeAllowsIt() throws Exception {
+        final int FRAME_SIZE = 8192;
+
+        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+        for (int i = 0; i < FRAME_SIZE; ++i) {
+            sendBuffer.writeByte('A');
+        }
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            // Server should pass the data through without issue with this size
+            server.setMaxFrameSize(FRAME_SIZE);
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<Transport> transports = new ArrayList<Transport>();
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            try {
+                // The transport should allow for the size of data we sent.
+                transport.setMaxFrameSize(FRAME_SIZE);
+                transport.connect(null);
+                transport.send(sendBuffer.copy());
+                transports.add(transport);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), FRAME_SIZE);
+                    return bytesRead.get() == FRAME_SIZE || !transport.isConnected();
+                }
+            }));
+
+            assertTrue("Connection failed while receiving.", transport.isConnected());
+
+            transport.close();
+        }
+
+        assertTrue(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testConnectionsSendReceiveLargeDataFailsDueToMaxFrameSize() throws Exception {
+        final int FRAME_SIZE = 1024;
+
+        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+        for (int i = 0; i < FRAME_SIZE; ++i) {
+            sendBuffer.writeByte('A');
+        }
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            // Server should pass the data through, client should choke on the incoming size.
+            server.setMaxFrameSize(FRAME_SIZE);
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<Transport> transports = new ArrayList<Transport>();
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            try {
+                // Transport can't receive anything bigger so it should fail the connection
+                // when data arrives that is larger than this value.
+                transport.setMaxFrameSize(FRAME_SIZE / 2);
+                transport.connect(null);
+                transport.send(sendBuffer.copy());
+                transports.add(transport);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue("Transport should have lost connection", Wait.waitFor(() -> !transport.isConnected()));
+        }
+
+        assertFalse(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testTransportDetectsConnectionDropWhenServerEnforcesMaxFrameSize() throws Exception {
+        final int FRAME_SIZE = 1024;
+
+        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+        for (int i = 0; i < FRAME_SIZE; ++i) {
+            sendBuffer.writeByte('A');
+        }
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            // Server won't accept the data as it's to large and will close the connection.
+            server.setMaxFrameSize(FRAME_SIZE / 2);
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<Transport> transports = new ArrayList<Transport>();
+
+            Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+            try {
+                // Transport allows bigger frames in so that server is the one causing the failure.
+                transport.setMaxFrameSize(FRAME_SIZE);
+                transport.connect(null);
+                transport.send(sendBuffer.copy());
+                transports.add(transport);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue("Transport should have lost connection", Wait.waitFor(() -> !transport.isConnected()));
+        }
+
+        assertFalse(exceptions.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f255f774/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsWSConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsWSConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsWSConnectionTest.java
index c362c33..7002e26 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsWSConnectionTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsWSConnectionTest.java
@@ -17,18 +17,30 @@
 package org.apache.qpid.jms;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.URI;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.net.ServerSocketFactory;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +51,9 @@ public class JmsWSConnectionTest {
 
     protected static final Logger LOG = LoggerFactory.getLogger(JmsWSConnectionTest.class);
 
+    @Rule
+    public TestName testName = new TestName();
+
     private BrokerService brokerService;
     private URI connectionURI;
     private final int DEFAULT_WS_PORT = 5679;
@@ -51,7 +66,8 @@ public class JmsWSConnectionTest {
         brokerService.setDeleteAllMessagesOnStartup(true);
         brokerService.setUseJmx(false);
 
-        TransportConnector connector = brokerService.addConnector("ws://0.0.0.0:" + getProxyPort());
+        TransportConnector connector = brokerService.addConnector(
+                "ws://0.0.0.0:" + getProxyPort() + "?websocket.maxBinaryMessageSize=1048576");
         connectionURI = connector.getPublishableConnectURI();
         LOG.debug("Using amqp+ws connection: {}", connectionURI);
 
@@ -65,7 +81,7 @@ public class JmsWSConnectionTest {
         brokerService.waitUntilStopped();
     }
 
-    @Test(timeout=30000)
+    @Test(timeout = 30000)
     public void testCreateConnectionAndStart() throws Exception {
         JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
         JmsConnection connection = (JmsConnection) factory.createConnection();
@@ -74,8 +90,86 @@ public class JmsWSConnectionTest {
         connection.close();
     }
 
+    @Test(timeout = 30000)
+    public void testSendLargeMessageToClientFromOpenWire() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+
+        sendLargeMessageViaOpenWire();
+
+        try {
+            Session session = connection.createSession();
+            Queue queue = session.createQueue(getQueueName());
+            connection.start();
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(1000);
+
+            assertNotNull(message);
+            assertTrue(message instanceof BytesMessage);
+        } finally {
+            connection.close();
+        }
+    }
+
+    @Ignore("Broker is not respecting max binary message size")
+    @Test(timeout = 30000)
+    public void testSendLargeMessageToClientFromAMQP() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+
+        sendLargeMessageViaAMQP();
+
+        try {
+            Session session = connection.createSession();
+            Queue queue = session.createQueue(getQueueName());
+            connection.start();
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(1000);
+
+            assertNotNull(message);
+            assertTrue(message instanceof BytesMessage);
+        } finally {
+            connection.close();
+        }
+    }
+
+    protected void sendLargeMessageViaOpenWire() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        doSendLargeMessageViaOpenWire(factory.createConnection());
+    }
+
+    protected void sendLargeMessageViaAMQP() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI());
+        doSendLargeMessageViaOpenWire(factory.createConnection());
+    }
+
+    protected void doSendLargeMessageViaOpenWire(Connection connection) throws Exception {
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getQueueName());
+            MessageProducer producer = session.createProducer(queue);
+
+            byte[] payload = new byte[1024 * 1024];
+            for (int i = 0; i < payload.length; ++i) {
+                payload[i] = (byte) (i % 256);
+            }
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(payload);
+
+            producer.send(message);
+        } finally {
+            connection.close();
+        }
+    }
+
+    protected String getQueueName() {
+        return testName.getMethodName();
+    }
+
     protected String getConnectionURI() throws Exception {
-        return "amqpws://localhost:" + connectionURI.getPort();
+        return "amqpws://localhost:" + connectionURI.getPort() + "?transport.traceBytes=true";
     }
 
     protected int getProxyPort() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org