You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/03/02 12:55:00 UTC

[jira] [Commented] (QPIDJMS-553) Shared Netty event loop group

    [ https://issues.apache.org/jira/browse/QPIDJMS-553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500125#comment-17500125 ] 

ASF GitHub Bot commented on QPIDJMS-553:
----------------------------------------

gemmellr commented on a change in pull request #45:
URL: https://github.com/apache/qpid-jms/pull/45#discussion_r816735224



##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupPoolFactory {
+
+   private NettyEventLoopGroupPoolFactory() {
+
+   }
+
+   public interface Ref<T> extends AutoCloseable {
+
+      T ref();
+
+      @Override
+      void close();
+   }
+
+   public interface EventLoopGroupPool {
+
+      Ref<EventLoopGroup> sharedGroupWith(int threads);
+
+   }
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+
+   private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions,
+                                                      final ThreadFactory ioThreadFactory) {
+      final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1;
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory);
+      }
+      if (useEpoll) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory);
+      }
+      return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory);
+   }
+
+   private static EventLoopGroup createEventLoopGroup(final int threads,
+                                                      final NettyEventLoopGroupPool poolType,
+                                                      final ThreadFactory ioThreadFactory) {
+      switch (Objects.requireNonNull(poolType)) {
+
+         case EPOLL:
+            LOG.trace("Netty Transport using Epoll mode");
+            return EpollSupport.createGroup(threads, ioThreadFactory);
+         case KQUEUE:
+            LOG.trace("Netty Transport using KQueue mode");
+            return KQueueSupport.createGroup(threads, ioThreadFactory);
+         case NIO:
+            LOG.trace("Netty Transport using Nio mode");
+            return new NioEventLoopGroup(threads, ioThreadFactory);
+         default:
+            throw new AssertionError("unexpected poolType: " + poolType);
+      }
+   }
+
+   public static Ref<EventLoopGroup> unsharedGroupWith(final TransportOptions transportOptions,
+                                                       final ThreadFactory threadFactory) {
+
+      final EventLoopGroup ref = createEventLoopGroup(transportOptions, threadFactory);
+
+      return new Ref<>() {
+         @Override
+         public EventLoopGroup ref() {
+            return ref;
+         }
+
+         @Override
+         public void close() {
+            Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
+         }
+      };
+   }
+
+   private static final class AtomicCloseableRef<T> implements Ref<T> {
+
+      private final Ref<T> ref;
+      private final AtomicBoolean closed;
+
+      public AtomicCloseableRef(final Ref<T> ref) {
+         this.ref = ref;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public T ref() {
+         return ref.ref();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            ref.close();
+         }
+      }
+   }
+
+   public static EventLoopGroupPool sharedGroupPoolWith(final TransportOptions transportOptions) {
+      if (transportOptions.getSharedEventLoopThreads() <= 0) {
+         throw new IllegalArgumentException("sharedEventLoopThreads must be > 0");
+      }
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return NettyEventLoopGroupPool.KQUEUE;
+      }
+      if (useEpoll) {
+         return NettyEventLoopGroupPool.EPOLL;
+      }
+      return NettyEventLoopGroupPool.NIO;
+   }
+
+   private enum NettyEventLoopGroupPool implements EventLoopGroupPool {
+      EPOLL, KQUEUE, NIO;
+
+      private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0);
+
+      private final class SharedGroupRef implements Ref<EventLoopGroup> {
+
+         private final EventLoopGroup group;
+         private final AtomicInteger refCnt;
+         private final int threads;
+
+         private SharedGroupRef(final EventLoopGroup group, final int threads) {
+            this.threads = threads;
+            this.group = group;
+            refCnt = new AtomicInteger(1);
+         }
+
+         public int getThreads() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return threads;
+         }
+
+         public boolean retain() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  // this has been already disposed!
+                  return false;
+               }
+               if (refCnt.compareAndSet(currValue, currValue + 1)) {
+                  return true;
+               }
+            }
+         }
+
+         @Override
+         public EventLoopGroup ref() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return group;
+         }
+
+         @Override
+         public void close() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  return;
+               }
+               if (refCnt.compareAndSet(currValue, currValue - 1)) {
+                  if (currValue == 1) {
+                     // if a racing thread has borrowed this lease, it would try already to set it to a new value:
+                     // this one is a best effort cleanup to help GC
+                     sharedEventLoopGroup.compareAndSet(this, null);
+                     Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                     if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                        LOG.trace("Channel group shutdown failed to complete in allotted time");
+                     }
+                  }
+                  return;
+               }
+            }
+         }
+      }
+
+      private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null);
+
+      @Override
+      public Ref<EventLoopGroup> sharedGroupWith(final int threads) {
+         if (threads <= 0) {
+            throw new IllegalArgumentException("threads must be > 0");
+         }
+         while (true) {
+            SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+            if (sharedGroupRef != null && sharedGroupRef.retain()) {
+               validateSharedRef(sharedGroupRef, threads);
+               return new AtomicCloseableRef<>(sharedGroupRef);
+            }
+            synchronized (this) {
+               sharedGroupRef = sharedEventLoopGroup.get();
+               if (sharedGroupRef != null && sharedGroupRef.retain()) {
+                  validateSharedRef(sharedGroupRef, threads);
+                  return new AtomicCloseableRef<>(sharedGroupRef);
+               }
+               sharedGroupRef = new SharedGroupRef(createEventLoopGroup(threads, this, new QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" + SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true)), threads);
+               sharedEventLoopGroup.set(sharedGroupRef);

Review comment:
       This seems like the factory is going to give all the threads in the group the same name. So in addition to not being named for the connection the thread is operating for anymore, it wont even be possible to distinguish from the name which thread in the pool it is.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupPoolFactory {
+
+   private NettyEventLoopGroupPoolFactory() {
+
+   }
+
+   public interface Ref<T> extends AutoCloseable {
+
+      T ref();
+
+      @Override
+      void close();
+   }
+
+   public interface EventLoopGroupPool {
+
+      Ref<EventLoopGroup> sharedGroupWith(int threads);
+
+   }
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+
+   private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions,
+                                                      final ThreadFactory ioThreadFactory) {
+      final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1;
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory);
+      }
+      if (useEpoll) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory);
+      }
+      return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory);

Review comment:
       Feels weird for this bit of independent code to also be deciding what is used, when thats also decided elsewhere for other must-match bits. Would seem simpler and more likely to be consistent for the decision to be made elsewhere in one place and passed in here. It would also allow removing the third duplication of this decision making lower in this class.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupPoolFactory {
+
+   private NettyEventLoopGroupPoolFactory() {
+
+   }
+
+   public interface Ref<T> extends AutoCloseable {
+
+      T ref();
+
+      @Override
+      void close();
+   }
+
+   public interface EventLoopGroupPool {
+
+      Ref<EventLoopGroup> sharedGroupWith(int threads);
+
+   }
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+
+   private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions,
+                                                      final ThreadFactory ioThreadFactory) {
+      final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1;
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory);
+      }
+      if (useEpoll) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory);
+      }
+      return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory);
+   }
+
+   private static EventLoopGroup createEventLoopGroup(final int threads,
+                                                      final NettyEventLoopGroupPool poolType,
+                                                      final ThreadFactory ioThreadFactory) {
+      switch (Objects.requireNonNull(poolType)) {
+
+         case EPOLL:
+            LOG.trace("Netty Transport using Epoll mode");
+            return EpollSupport.createGroup(threads, ioThreadFactory);
+         case KQUEUE:
+            LOG.trace("Netty Transport using KQueue mode");
+            return KQueueSupport.createGroup(threads, ioThreadFactory);
+         case NIO:
+            LOG.trace("Netty Transport using Nio mode");
+            return new NioEventLoopGroup(threads, ioThreadFactory);
+         default:
+            throw new AssertionError("unexpected poolType: " + poolType);
+      }
+   }
+
+   public static Ref<EventLoopGroup> unsharedGroupWith(final TransportOptions transportOptions,
+                                                       final ThreadFactory threadFactory) {
+
+      final EventLoopGroup ref = createEventLoopGroup(transportOptions, threadFactory);
+
+      return new Ref<>() {
+         @Override
+         public EventLoopGroup ref() {
+            return ref;
+         }
+
+         @Override
+         public void close() {
+            Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
+         }
+      };
+   }
+
+   private static final class AtomicCloseableRef<T> implements Ref<T> {
+
+      private final Ref<T> ref;
+      private final AtomicBoolean closed;
+
+      public AtomicCloseableRef(final Ref<T> ref) {
+         this.ref = ref;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public T ref() {
+         return ref.ref();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            ref.close();
+         }
+      }
+   }
+
+   public static EventLoopGroupPool sharedGroupPoolWith(final TransportOptions transportOptions) {
+      if (transportOptions.getSharedEventLoopThreads() <= 0) {
+         throw new IllegalArgumentException("sharedEventLoopThreads must be > 0");
+      }
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return NettyEventLoopGroupPool.KQUEUE;
+      }
+      if (useEpoll) {
+         return NettyEventLoopGroupPool.EPOLL;
+      }
+      return NettyEventLoopGroupPool.NIO;
+   }
+
+   private enum NettyEventLoopGroupPool implements EventLoopGroupPool {
+      EPOLL, KQUEUE, NIO;
+
+      private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0);
+
+      private final class SharedGroupRef implements Ref<EventLoopGroup> {
+
+         private final EventLoopGroup group;
+         private final AtomicInteger refCnt;
+         private final int threads;
+
+         private SharedGroupRef(final EventLoopGroup group, final int threads) {
+            this.threads = threads;
+            this.group = group;
+            refCnt = new AtomicInteger(1);
+         }
+
+         public int getThreads() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return threads;
+         }
+
+         public boolean retain() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  // this has been already disposed!
+                  return false;
+               }
+               if (refCnt.compareAndSet(currValue, currValue + 1)) {
+                  return true;
+               }
+            }
+         }
+
+         @Override
+         public EventLoopGroup ref() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return group;
+         }
+
+         @Override
+         public void close() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  return;
+               }
+               if (refCnt.compareAndSet(currValue, currValue - 1)) {
+                  if (currValue == 1) {
+                     // if a racing thread has borrowed this lease, it would try already to set it to a new value:
+                     // this one is a best effort cleanup to help GC
+                     sharedEventLoopGroup.compareAndSet(this, null);
+                     Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                     if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                        LOG.trace("Channel group shutdown failed to complete in allotted time");
+                     }
+                  }
+                  return;
+               }
+            }
+         }
+      }
+
+      private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null);
+
+      @Override
+      public Ref<EventLoopGroup> sharedGroupWith(final int threads) {
+         if (threads <= 0) {
+            throw new IllegalArgumentException("threads must be > 0");
+         }
+         while (true) {
+            SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+            if (sharedGroupRef != null && sharedGroupRef.retain()) {
+               validateSharedRef(sharedGroupRef, threads);
+               return new AtomicCloseableRef<>(sharedGroupRef);
+            }
+            synchronized (this) {
+               sharedGroupRef = sharedEventLoopGroup.get();
+               if (sharedGroupRef != null && sharedGroupRef.retain()) {
+                  validateSharedRef(sharedGroupRef, threads);
+                  return new AtomicCloseableRef<>(sharedGroupRef);
+               }
+               sharedGroupRef = new SharedGroupRef(createEventLoopGroup(threads, this, new QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" + SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true)), threads);
+               sharedEventLoopGroup.set(sharedGroupRef);
+               return new AtomicCloseableRef<>(sharedGroupRef);
+            }
+         }
+      }
+
+      private static void validateSharedRef(final SharedGroupRef sharedRef, final int threads) {
+         if (threads != sharedRef.getThreads()) {
+            try (SharedGroupRef ref = sharedRef) {
+               throw new IllegalArgumentException("threads cannot be different from " + sharedRef.getThreads());
+            }
+         }
+      }

Review comment:
       Doubt Tim will like this anymore than the prior throwing if the types differed. I dont particularly either. I think it either needs to handle it somehow (e.g distinct pools), or just proceed with whats there and warn if its less (or perhaps just different) than requested.
   
   Unclear why the assignment is in the try, when its value then isnt used; did you mean to use ref instead of sharedRef in the exception message? Even if so, wouldnt that still be racey with the sharedRef use in the out enclosing if? Or is it so it closes it after throwing because its autoClosable? Whatever it is, if an update doesnt make it clearer, its probably worth a comment unless this is just removed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupPoolFactory {
+
+   private NettyEventLoopGroupPoolFactory() {
+
+   }
+
+   public interface Ref<T> extends AutoCloseable {
+
+      T ref();
+
+      @Override
+      void close();
+   }
+
+   public interface EventLoopGroupPool {
+
+      Ref<EventLoopGroup> sharedGroupWith(int threads);
+
+   }
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+
+   private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions,
+                                                      final ThreadFactory ioThreadFactory) {
+      final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1;
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory);
+      }
+      if (useEpoll) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory);
+      }
+      return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory);
+   }
+
+   private static EventLoopGroup createEventLoopGroup(final int threads,
+                                                      final NettyEventLoopGroupPool poolType,
+                                                      final ThreadFactory ioThreadFactory) {
+      switch (Objects.requireNonNull(poolType)) {
+
+         case EPOLL:
+            LOG.trace("Netty Transport using Epoll mode");
+            return EpollSupport.createGroup(threads, ioThreadFactory);
+         case KQUEUE:
+            LOG.trace("Netty Transport using KQueue mode");
+            return KQueueSupport.createGroup(threads, ioThreadFactory);
+         case NIO:
+            LOG.trace("Netty Transport using Nio mode");
+            return new NioEventLoopGroup(threads, ioThreadFactory);
+         default:
+            throw new AssertionError("unexpected poolType: " + poolType);
+      }
+   }
+
+   public static Ref<EventLoopGroup> unsharedGroupWith(final TransportOptions transportOptions,
+                                                       final ThreadFactory threadFactory) {
+
+      final EventLoopGroup ref = createEventLoopGroup(transportOptions, threadFactory);
+
+      return new Ref<>() {
+         @Override
+         public EventLoopGroup ref() {
+            return ref;
+         }
+
+         @Override
+         public void close() {
+            Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
+         }
+      };
+   }
+
+   private static final class AtomicCloseableRef<T> implements Ref<T> {
+
+      private final Ref<T> ref;
+      private final AtomicBoolean closed;
+
+      public AtomicCloseableRef(final Ref<T> ref) {
+         this.ref = ref;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public T ref() {
+         return ref.ref();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            ref.close();
+         }
+      }
+   }
+
+   public static EventLoopGroupPool sharedGroupPoolWith(final TransportOptions transportOptions) {
+      if (transportOptions.getSharedEventLoopThreads() <= 0) {
+         throw new IllegalArgumentException("sharedEventLoopThreads must be > 0");
+      }
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return NettyEventLoopGroupPool.KQUEUE;
+      }
+      if (useEpoll) {
+         return NettyEventLoopGroupPool.EPOLL;
+      }
+      return NettyEventLoopGroupPool.NIO;

Review comment:
       See other comment about duplication, here also.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -140,19 +140,15 @@ public ScheduledExecutorService connect(final Runnable initRoutine, SSLContext s
         boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
         boolean useEpoll = EpollSupport.isAvailable(transportOptions);
 
-        if (useKQueue) {
-            LOG.trace("Netty Transport using KQueue mode");
-            group = KQueueSupport.createGroup(1, ioThreadfactory);
-        } else if (useEpoll) {
-            LOG.trace("Netty Transport using Epoll mode");
-            group = EpollSupport.createGroup(1, ioThreadfactory);
+        if (transportOptions.getSharedEventLoopThreads() > 0) {
+            group = sharedGroupPoolWith(transportOptions).sharedGroupWith(transportOptions.getSharedEventLoopThreads());

Review comment:
       The double call through is confusing and seems unnecessary.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -60,18 +59,19 @@
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupPoolFactory.sharedGroupPoolWith;
+import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupPoolFactory.unsharedGroupWith;
+
 /**
  * TCP based transport that uses Netty as the underlying IO layer.
  */
 public class NettyTcpTransport implements Transport {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
 
-    public static final int SHUTDOWN_TIMEOUT = 50;
     public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
 
-    protected Bootstrap bootstrap;
-    protected EventLoopGroup group;
+    protected Ref<EventLoopGroup> group;

Review comment:
       Probably shouldnt be called group anymore, perhaps groupRef. The close really makes it look like the group is being closed, when the point here is it probably isnt. Tweaking the field name (as you actually did do in a related test, hehe) would make it a touch more obvious.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Shared Netty event loop group
> -----------------------------
>
>                 Key: QPIDJMS-553
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-553
>             Project: Qpid JMS
>          Issue Type: New Feature
>            Reporter: Francesco Nigro
>            Priority: Major
>
> One of the most interesting feature of Netty while using KQueue/NIO/Epoll in non-blocking mode is to be able to handle many connections with few threads; this is going to be critical and even more important with the upcoming IO_URING support, where the time spent on the Netty event loop to handle network syscalls will be further reduced, allowing syscall batching across different connections.
> Having the chance to handle many client connections with few Netty threads is already beneficial in constrained environments (containers with few cores) in order to reduce the native and heap memory usage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org