You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/09 19:32:45 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1342: Support Netty Native KQueue on macOS

ARTEMIS-1342: Support Netty Native KQueue on macOS

Add support for KQueue for when server or client runs on macOS. This is inline with the epoll support for linux.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0bc55100
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0bc55100
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0bc55100

Branch: refs/heads/master
Commit: 0bc55100592453cbfd2fe1359dcd8d96b347d58b
Parents: 687e318
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Wed Aug 9 17:43:40 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:23:16 2017 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/artemis/utils/Env.java  |  5 ++
 .../core/client/ActiveMQClientLogger.java       |  5 ++
 .../artemis/core/remoting/impl/netty/Epoll.java |  6 +--
 .../core/remoting/impl/netty/KQueue.java        | 51 ++++++++++++++++++++
 .../remoting/impl/netty/NettyConnector.java     | 14 ++++++
 .../remoting/impl/netty/TransportConstants.java |  6 +++
 .../src/main/resources/features.xml             |  1 +
 .../core/remoting/impl/netty/NettyAcceptor.java | 17 +++++++
 .../main/resources/servers/expire/broker.xml    | 11 +++--
 9 files changed, 108 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
index 94f69d3..cd41bef 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
@@ -61,6 +61,7 @@ public final class Env {
 
    private static final String OS = System.getProperty("os.name").toLowerCase();
    private static final boolean IS_LINUX = OS.startsWith("linux");
+   private static final boolean IS_MAC = OS.startsWith("mac");
    private static final boolean IS_64BIT = checkIs64bit();
 
    private Env() {
@@ -87,6 +88,10 @@ public final class Env {
       return IS_LINUX == true;
    }
 
+   public static boolean isMacOs() {
+      return IS_MAC == true;
+   }
+
    public static boolean is64BitJvm() {
       return IS_64BIT;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 6fbb911..9814d88 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -536,4 +536,9 @@ public interface ActiveMQClientLogger extends BasicLogger {
    @Message(id = 214033, value = "Cannot resolve host ",
            format = Message.Format.MESSAGE_FORMAT)
    void unableToResolveHost(@Cause UnknownHostException e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 214034, value = "Unable to check KQueue availability ",
+       format = Message.Format.MESSAGE_FORMAT)
+   void unableToCheckKQueueAvailability(@Cause Throwable e);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java
index 8553d7f..8779a5d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java
@@ -25,9 +25,9 @@ import org.apache.activemq.artemis.utils.Env;
  */
 public final class Epoll {
 
-   private static final boolean IS_AVAILABLE_EPOLL = isIsAvailableEpoll();
+   private static final boolean IS_EPOLL_AVAILABLE = isEpollAvailable();
 
-   private static boolean isIsAvailableEpoll() {
+   private static boolean isEpollAvailable() {
       try {
          if (Env.is64BitJvm() && Env.isLinuxOs()) {
             return io.netty.channel.epoll.Epoll.isAvailable();
@@ -46,6 +46,6 @@ public final class Epoll {
    }
 
    public static boolean isAvailable() {
-      return IS_AVAILABLE_EPOLL;
+      return IS_EPOLL_AVAILABLE;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java
new file mode 100644
index 0000000..d2adae3
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.utils.Env;
+
+/**
+ * Tells if <a href="http://netty.io/wiki/native-transports.html">{@code netty-transport-native-kqueue}</a> is supported.
+ */
+public final class KQueue {
+
+   private static final boolean IS_KQUEUE_AVAILABLE = isKQueueAvailable();
+
+   private static boolean isKQueueAvailable() {
+      try {
+         if (Env.is64BitJvm() && Env.isMacOs()) {
+            return io.netty.channel.kqueue.KQueue.isAvailable();
+         } else {
+            return false;
+         }
+      } catch (Throwable e) {
+         ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e);
+         return false;
+      }
+
+   }
+
+   private KQueue() {
+
+   }
+
+   public static boolean isAvailable() {
+      return IS_KQUEUE_AVAILABLE;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 8e48cf9..aaf0b08 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -65,6 +65,8 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.base64.Base64;
@@ -232,6 +234,8 @@ public class NettyConnector extends AbstractConnector {
 
    private boolean useEpoll;
 
+   private boolean useKQueue;
+
    private int remotingThreads;
 
    private boolean useGlobalWorkerPool;
@@ -309,6 +313,7 @@ public class NettyConnector extends AbstractConnector {
       useGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME, useGlobalWorkerPool, configuration);
 
       useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
+      useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
 
       useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
       host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@@ -415,6 +420,15 @@ public class NettyConnector extends AbstractConnector {
 
          channelClazz = EpollSocketChannel.class;
          logger.debug("Connector " + this + " using native epoll");
+      } else if (useKQueue && KQueue.isAvailable()) {
+         if (useGlobalWorkerPool) {
+            group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
+         } else {
+            group = new KQueueEventLoopGroup(remotingThreads);
+         }
+
+         channelClazz = KQueueSocketChannel.class;
+         logger.debug("Connector " + this + " using native kqueue");
       } else {
          if (useGlobalWorkerPool) {
             channelClazz = NioSocketChannel.class;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 5288f38..646de80 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -53,6 +53,8 @@ public class TransportConstants {
 
    public static final String USE_EPOLL_PROP_NAME = "useEpoll";
 
+   public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
+
    @Deprecated
    /**
     * @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
@@ -157,6 +159,8 @@ public class TransportConstants {
 
    public static final boolean DEFAULT_USE_EPOLL = true;
 
+   public static final boolean DEFAULT_USE_KQUEUE = true;
+
    public static final boolean DEFAULT_USE_INVM = false;
 
    public static final boolean DEFAULT_USE_SERVLET = false;
@@ -255,6 +259,7 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
       //noinspection deprecation
       allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@@ -309,6 +314,7 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index cf5a2a9..bae4a4c 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -36,6 +36,7 @@
 		<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
+		<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
 	</feature>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index b41fc70..2477bfc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -55,6 +55,8 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueServerSocketChannel;
 import io.netty.channel.local.LocalAddress;
 import io.netty.channel.local.LocalServerChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -96,6 +98,7 @@ public class NettyAcceptor extends AbstractAcceptor {
    public static String INVM_ACCEPTOR_TYPE = "IN-VM";
    public static String NIO_ACCEPTOR_TYPE = "NIO";
    public static String EPOLL_ACCEPTOR_TYPE = "EPOLL";
+   public static String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
 
    static {
       // Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@@ -130,6 +133,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final boolean useEpoll;
 
+   private final boolean useKQueue;
+
    private final ProtocolHandler protocolHandler;
 
    private final String host;
@@ -228,6 +233,7 @@ public class NettyAcceptor extends AbstractAcceptor {
       remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration);
 
       useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
+      useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
 
       backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
       useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@@ -318,6 +324,17 @@ public class NettyAcceptor extends AbstractAcceptor {
             acceptorType = EPOLL_ACCEPTOR_TYPE;
 
             logger.debug("Acceptor using native epoll");
+         } else if (useKQueue && KQueue.isAvailable()) {
+            channelClazz = KQueueServerSocketChannel.class;
+            eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
+               @Override
+               public ActiveMQThreadFactory run() {
+                  return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+               }
+            }));
+            acceptorType = KQUEUE_ACCEPTOR_TYPE;
+
+            logger.debug("Acceptor using native kqueue");
          } else {
             channelClazz = NioServerSocketChannel.class;
             eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
index a4176f8..0930296 100644
--- a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
@@ -98,23 +98,24 @@ under the License.
       <acceptors>
 
          <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it -->
          <!-- amqpCredits: The number of credits sent to AMQP producers -->
          <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
 
          <!-- Acceptor for every supported protocol -->
-         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
 
          <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
-         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
+         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
 
          <!-- STOMP Acceptor. -->
-         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor>
 
          <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
-         <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+         <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true</acceptor>
 
          <!-- MQTT Acceptor -->
-         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true</acceptor>
 
       </acceptors>