You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/08/26 10:59:32 UTC

qpid-jms git commit: QPIDJMS-315: Add support for Netty KQueue transport

Repository: qpid-jms
Updated Branches:
  refs/heads/master 2646bdae0 -> 86b678e83


QPIDJMS-315: Add support for Netty KQueue transport

This closes #12.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/86b678e8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/86b678e8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/86b678e8

Branch: refs/heads/master
Commit: 86b678e835097fa18acb8419f232dab2a2eb4a69
Parents: 2646bda
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Thu Aug 24 08:11:56 2017 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Sat Aug 26 11:52:04 2017 +0100

----------------------------------------------------------------------
 pom.xml                                         |  7 ++
 qpid-jms-client/pom.xml                         |  5 ++
 .../qpid/jms/transports/TransportOptions.java   | 19 +++++
 .../jms/transports/netty/NettyTcpTransport.java | 13 +++-
 .../transports/netty/NettyTcpTransportTest.java | 76 +++++++++++++++++++-
 qpid-jms-docs/Configuration.md                  |  1 +
 6 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28ea51e..7214929 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
     <argLine>-Xmx2g -enableassertions ${jacoco-config}</argLine>
 
     <netty-transport-native-epoll-classifier>linux-x86_64</netty-transport-native-epoll-classifier>
+    <netty-transport-native-kqueue-classifier>osx-x86_64</netty-transport-native-kqueue-classifier>
   </properties>
 
   <issueManagement>
@@ -148,6 +149,12 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
+        <artifactId>netty-transport-native-kqueue</artifactId>
+        <version>${netty-version}</version>
+        <classifier>${netty-transport-native-kqueue-classifier}</classifier>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
         <artifactId>netty-codec-http</artifactId>
         <version>${netty-version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index f7fd3e6..5b578c4 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -67,6 +67,11 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-kqueue</artifactId>
+      <classifier>${netty-transport-native-kqueue-classifier}</classifier>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
       <artifactId>netty-codec-http</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
index deecd6d..3367e6c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -31,6 +31,7 @@ public class TransportOptions implements Cloneable {
     public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
     public static final int DEFAULT_TCP_PORT = 5672;
     public static final boolean DEFAULT_USE_EPOLL = true;
+    public static final boolean DEFAULT_USE_KQUEUE = false;
     public static final boolean DEFAULT_TRACE_BYTES = false;
 
     private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
@@ -43,6 +44,7 @@ public class TransportOptions implements Cloneable {
     private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
     private int defaultTcpPort = DEFAULT_TCP_PORT;
     private boolean useEpoll = DEFAULT_USE_EPOLL;
+    private boolean useKQueue = DEFAULT_USE_KQUEUE;
     private boolean traceBytes = DEFAULT_TRACE_BYTES;
 
     /**
@@ -184,6 +186,23 @@ public class TransportOptions implements Cloneable {
     }
 
     /**
+     * @return the true if use of of the netty kqueue transport is used.
+     */
+    public boolean isUseKQueue() {
+        return useKQueue;
+    }
+
+    /**
+     * Determines if the netty kqueue transport can be used if available on this platform.
+     *
+     * @param useKQueue
+     * 		should use of available kqueue transport be used.
+     */
+    public void setUseKQueue(boolean useKQueue) {
+        this.useKQueue = useKQueue;
+    }
+
+    /**
      * @return true if the transport should enable byte tracing
      */
     public boolean isTraceBytes() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index da77be8..58ee073 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -50,6 +50,9 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
@@ -139,9 +142,13 @@ public class NettyTcpTransport implements Transport {
             sslHandler = null;
         }
 
+        boolean useKQueue = getTransportOptions().isUseKQueue() && KQueue.isAvailable();
         boolean useEpoll = getTransportOptions().isUseEpoll() && Epoll.isAvailable();
 
-        if (useEpoll) {
+        if (useKQueue) {
+            LOG.trace("Netty Transport using KQueue mode");
+            group = new KQueueEventLoopGroup(1);
+        } else if (useEpoll) {
             LOG.trace("Netty Transport using Epoll mode");
             group = new EpollEventLoopGroup(1);
         } else {
@@ -151,7 +158,9 @@ public class NettyTcpTransport implements Transport {
 
         bootstrap = new Bootstrap();
         bootstrap.group(group);
-        if (useEpoll) {
+        if (useKQueue) {
+            bootstrap.channel(KQueueSocketChannel.class);
+        } else if (useEpoll) {
             bootstrap.channel(EpollSocketChannel.class);
         } else {
             bootstrap.channel(NioSocketChannel.class);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index af354b2..68a5cb5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -31,6 +31,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.transports.Transport;
@@ -43,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
@@ -525,7 +527,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
 
             TransportOptions options = createClientOptions();
             options.setUseEpoll(useEpoll);
-
+            options.setUseKQueue(false);
             Transport transport = createTransport(serverLocation, testListener, options);
             try {
                 transport.connect(null);
@@ -574,6 +576,76 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServerWithKQueueEnabled() throws Exception {
+        doTestKQueueSupport(true);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServerWithKQueueDisabled() throws Exception {
+        doTestKQueueSupport(false);
+    }
+
+    private void doTestKQueueSupport(boolean useKQueue) throws Exception {
+        assumeTrue(KQueue.isAvailable());
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            TransportOptions options = createClientOptions();
+            options.setUseKQueue(useKQueue);
+            options.setUseEpoll(false);
+            Transport transport = createTransport(serverLocation, testListener, options);
+            try {
+                transport.connect(null);
+                LOG.info("Connected to server:{} as expected.", serverLocation);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(transport.isConnected());
+            assertEquals(serverLocation, transport.getRemoteLocation());
+            assertKQueue("Transport should be using Kqueue", useKQueue, transport);
+
+            transport.close();
+
+            // Additional close should not fail or cause other problems.
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    private void assertKQueue(String message, boolean expected, Transport transport) throws Exception {
+        Field group = null;
+        Class<?> transportType = transport.getClass();
+
+        while (transportType != null && group == null) {
+            try {
+                group = transportType.getDeclaredField("group");
+            } catch (NoSuchFieldException error) {
+                transportType = transportType.getSuperclass();
+                if (Object.class.equals(transportType)) {
+                    transportType = null;
+                }
+            }
+        }
+
+        assertNotNull("Transport implementation unknown", group);
+
+        group.setAccessible(true);
+        if (expected) {
+            assertTrue(message, group.get(transport) instanceof KQueueEventLoopGroup);
+        } else {
+            assertFalse(message, group.get(transport) instanceof KQueueEventLoopGroup);
+        }
+    }
+
     protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) {
         if (listener == null) {
             return new NettyTcpTransport(serverLocation, options);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-docs/Configuration.md
----------------------------------------------------------------------
diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md
index 6e5ee81..b950b5c 100644
--- a/qpid-jms-docs/Configuration.md
+++ b/qpid-jms-docs/Configuration.md
@@ -180,6 +180,7 @@ The complete set of SSL Transport options is listed below:
 + **transport.verifyHost** Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to true.
 + **transport.keyAlias** The alias to use when selecting a keypair from the keystore if required to send a client certificate to the server. No default.
 + **transport.useEpoll** When true the transport will use the native Epoll layer when available instead of the NIO layer, which can improve performance. Defaults to true.
++ **transport.useKQueue** When true the transport will use the native KQueue layer when available instead of the NIO layer, which can improve performance. Defaults to false.
 
 ### Websocket Transport Configuration options
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org