You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/08/26 10:59:32 UTC
qpid-jms git commit: QPIDJMS-315: Add support for Netty KQueue
transport
Repository: qpid-jms
Updated Branches:
refs/heads/master 2646bdae0 -> 86b678e83
QPIDJMS-315: Add support for Netty KQueue transport
This closes #12.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/86b678e8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/86b678e8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/86b678e8
Branch: refs/heads/master
Commit: 86b678e835097fa18acb8419f232dab2a2eb4a69
Parents: 2646bda
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Thu Aug 24 08:11:56 2017 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Sat Aug 26 11:52:04 2017 +0100
----------------------------------------------------------------------
pom.xml | 7 ++
qpid-jms-client/pom.xml | 5 ++
.../qpid/jms/transports/TransportOptions.java | 19 +++++
.../jms/transports/netty/NettyTcpTransport.java | 13 +++-
.../transports/netty/NettyTcpTransportTest.java | 76 +++++++++++++++++++-
qpid-jms-docs/Configuration.md | 1 +
6 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28ea51e..7214929 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
<argLine>-Xmx2g -enableassertions ${jacoco-config}</argLine>
<netty-transport-native-epoll-classifier>linux-x86_64</netty-transport-native-epoll-classifier>
+ <netty-transport-native-kqueue-classifier>osx-x86_64</netty-transport-native-kqueue-classifier>
</properties>
<issueManagement>
@@ -148,6 +149,12 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-kqueue</artifactId>
+ <version>${netty-version}</version>
+ <classifier>${netty-transport-native-kqueue-classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty-version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index f7fd3e6..5b578c4 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -67,6 +67,11 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-kqueue</artifactId>
+ <classifier>${netty-transport-native-kqueue-classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
index deecd6d..3367e6c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -31,6 +31,7 @@ public class TransportOptions implements Cloneable {
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_USE_EPOLL = true;
+ public static final boolean DEFAULT_USE_KQUEUE = false;
public static final boolean DEFAULT_TRACE_BYTES = false;
private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
@@ -43,6 +44,7 @@ public class TransportOptions implements Cloneable {
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean useEpoll = DEFAULT_USE_EPOLL;
+ private boolean useKQueue = DEFAULT_USE_KQUEUE;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
/**
@@ -184,6 +186,23 @@ public class TransportOptions implements Cloneable {
}
/**
+ * @return the true if use of of the netty kqueue transport is used.
+ */
+ public boolean isUseKQueue() {
+ return useKQueue;
+ }
+
+ /**
+ * Determines if the netty kqueue transport can be used if available on this platform.
+ *
+ * @param useKQueue
+ * should use of available kqueue transport be used.
+ */
+ public void setUseKQueue(boolean useKQueue) {
+ this.useKQueue = useKQueue;
+ }
+
+ /**
* @return true if the transport should enable byte tracing
*/
public boolean isTraceBytes() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index da77be8..58ee073 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -50,6 +50,9 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
@@ -139,9 +142,13 @@ public class NettyTcpTransport implements Transport {
sslHandler = null;
}
+ boolean useKQueue = getTransportOptions().isUseKQueue() && KQueue.isAvailable();
boolean useEpoll = getTransportOptions().isUseEpoll() && Epoll.isAvailable();
- if (useEpoll) {
+ if (useKQueue) {
+ LOG.trace("Netty Transport using KQueue mode");
+ group = new KQueueEventLoopGroup(1);
+ } else if (useEpoll) {
LOG.trace("Netty Transport using Epoll mode");
group = new EpollEventLoopGroup(1);
} else {
@@ -151,7 +158,9 @@ public class NettyTcpTransport implements Transport {
bootstrap = new Bootstrap();
bootstrap.group(group);
- if (useEpoll) {
+ if (useKQueue) {
+ bootstrap.channel(KQueueSocketChannel.class);
+ } else if (useEpoll) {
bootstrap.channel(EpollSocketChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index af354b2..68a5cb5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -31,6 +31,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.transports.Transport;
@@ -43,7 +46,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
@@ -525,7 +527,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
TransportOptions options = createClientOptions();
options.setUseEpoll(useEpoll);
-
+ options.setUseKQueue(false);
Transport transport = createTransport(serverLocation, testListener, options);
try {
transport.connect(null);
@@ -574,6 +576,76 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 60 * 1000)
+ public void testConnectToServerWithKQueueEnabled() throws Exception {
+ doTestKQueueSupport(true);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testConnectToServerWithKQueueDisabled() throws Exception {
+ doTestKQueueSupport(false);
+ }
+
+ private void doTestKQueueSupport(boolean useKQueue) throws Exception {
+ assumeTrue(KQueue.isAvailable());
+
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ TransportOptions options = createClientOptions();
+ options.setUseKQueue(useKQueue);
+ options.setUseEpoll(false);
+ Transport transport = createTransport(serverLocation, testListener, options);
+ try {
+ transport.connect(null);
+ LOG.info("Connected to server:{} as expected.", serverLocation);
+ } catch (Exception e) {
+ fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+ }
+
+ assertTrue(transport.isConnected());
+ assertEquals(serverLocation, transport.getRemoteLocation());
+ assertKQueue("Transport should be using Kqueue", useKQueue, transport);
+
+ transport.close();
+
+ // Additional close should not fail or cause other problems.
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ private void assertKQueue(String message, boolean expected, Transport transport) throws Exception {
+ Field group = null;
+ Class<?> transportType = transport.getClass();
+
+ while (transportType != null && group == null) {
+ try {
+ group = transportType.getDeclaredField("group");
+ } catch (NoSuchFieldException error) {
+ transportType = transportType.getSuperclass();
+ if (Object.class.equals(transportType)) {
+ transportType = null;
+ }
+ }
+ }
+
+ assertNotNull("Transport implementation unknown", group);
+
+ group.setAccessible(true);
+ if (expected) {
+ assertTrue(message, group.get(transport) instanceof KQueueEventLoopGroup);
+ } else {
+ assertFalse(message, group.get(transport) instanceof KQueueEventLoopGroup);
+ }
+ }
+
protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) {
if (listener == null) {
return new NettyTcpTransport(serverLocation, options);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/86b678e8/qpid-jms-docs/Configuration.md
----------------------------------------------------------------------
diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md
index 6e5ee81..b950b5c 100644
--- a/qpid-jms-docs/Configuration.md
+++ b/qpid-jms-docs/Configuration.md
@@ -180,6 +180,7 @@ The complete set of SSL Transport options is listed below:
+ **transport.verifyHost** Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to true.
+ **transport.keyAlias** The alias to use when selecting a keypair from the keystore if required to send a client certificate to the server. No default.
+ **transport.useEpoll** When true the transport will use the native Epoll layer when available instead of the NIO layer, which can improve performance. Defaults to true.
++ **transport.useKQueue** When true the transport will use the native KQueue layer when available instead of the NIO layer, which can improve performance. Defaults to false.
### Websocket Transport Configuration options
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org