You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2022/03/02 13:06:08 UTC

[GitHub] [qpid-jms] franz1981 commented on a change in pull request #45: QPIDJMS-553 Shared Netty event loop group

franz1981 commented on a change in pull request #45:
URL: https://github.com/apache/qpid-jms/pull/45#discussion_r817671598



##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupPoolFactory {
+
+   private NettyEventLoopGroupPoolFactory() {
+
+   }
+
+   public interface Ref<T> extends AutoCloseable {
+
+      T ref();
+
+      @Override
+      void close();
+   }
+
+   public interface EventLoopGroupPool {
+
+      Ref<EventLoopGroup> sharedGroupWith(int threads);
+
+   }
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+
+   private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions,
+                                                      final ThreadFactory ioThreadFactory) {
+      final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1;
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory);
+      }
+      if (useEpoll) {
+         return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory);
+      }
+      return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory);
+   }
+
+   private static EventLoopGroup createEventLoopGroup(final int threads,
+                                                      final NettyEventLoopGroupPool poolType,
+                                                      final ThreadFactory ioThreadFactory) {
+      switch (Objects.requireNonNull(poolType)) {
+
+         case EPOLL:
+            LOG.trace("Netty Transport using Epoll mode");
+            return EpollSupport.createGroup(threads, ioThreadFactory);
+         case KQUEUE:
+            LOG.trace("Netty Transport using KQueue mode");
+            return KQueueSupport.createGroup(threads, ioThreadFactory);
+         case NIO:
+            LOG.trace("Netty Transport using Nio mode");
+            return new NioEventLoopGroup(threads, ioThreadFactory);
+         default:
+            throw new AssertionError("unexpected poolType: " + poolType);
+      }
+   }
+
+   public static Ref<EventLoopGroup> unsharedGroupWith(final TransportOptions transportOptions,
+                                                       final ThreadFactory threadFactory) {
+
+      final EventLoopGroup ref = createEventLoopGroup(transportOptions, threadFactory);
+
+      return new Ref<>() {
+         @Override
+         public EventLoopGroup ref() {
+            return ref;
+         }
+
+         @Override
+         public void close() {
+            Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
+         }
+      };
+   }
+
+   private static final class AtomicCloseableRef<T> implements Ref<T> {
+
+      private final Ref<T> ref;
+      private final AtomicBoolean closed;
+
+      public AtomicCloseableRef(final Ref<T> ref) {
+         this.ref = ref;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public T ref() {
+         return ref.ref();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            ref.close();
+         }
+      }
+   }
+
+   public static EventLoopGroupPool sharedGroupPoolWith(final TransportOptions transportOptions) {
+      if (transportOptions.getSharedEventLoopThreads() <= 0) {
+         throw new IllegalArgumentException("sharedEventLoopThreads must be > 0");
+      }
+      final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+      final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+      if (useKQueue) {
+         return NettyEventLoopGroupPool.KQUEUE;
+      }
+      if (useEpoll) {
+         return NettyEventLoopGroupPool.EPOLL;
+      }
+      return NettyEventLoopGroupPool.NIO;
+   }
+
+   private enum NettyEventLoopGroupPool implements EventLoopGroupPool {
+      EPOLL, KQUEUE, NIO;
+
+      private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0);
+
+      private final class SharedGroupRef implements Ref<EventLoopGroup> {
+
+         private final EventLoopGroup group;
+         private final AtomicInteger refCnt;
+         private final int threads;
+
+         private SharedGroupRef(final EventLoopGroup group, final int threads) {
+            this.threads = threads;
+            this.group = group;
+            refCnt = new AtomicInteger(1);
+         }
+
+         public int getThreads() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return threads;
+         }
+
+         public boolean retain() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  // this has been already disposed!
+                  return false;
+               }
+               if (refCnt.compareAndSet(currValue, currValue + 1)) {
+                  return true;
+               }
+            }
+         }
+
+         @Override
+         public EventLoopGroup ref() {
+            if (refCnt.get() == 0) {
+               throw new IllegalStateException("the event loop group cannot be reused");
+            }
+            return group;
+         }
+
+         @Override
+         public void close() {
+            while (true) {
+               final int currValue = refCnt.get();
+               if (currValue == 0) {
+                  return;
+               }
+               if (refCnt.compareAndSet(currValue, currValue - 1)) {
+                  if (currValue == 1) {
+                     // if a racing thread has borrowed this lease, it would try already to set it to a new value:
+                     // this one is a best effort cleanup to help GC
+                     sharedEventLoopGroup.compareAndSet(this, null);
+                     Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                     if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                        LOG.trace("Channel group shutdown failed to complete in allotted time");
+                     }
+                  }
+                  return;
+               }
+            }
+         }
+      }
+
+      private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null);
+
+      @Override
+      public Ref<EventLoopGroup> sharedGroupWith(final int threads) {
+         if (threads <= 0) {
+            throw new IllegalArgumentException("threads must be > 0");
+         }
+         while (true) {
+            SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+            if (sharedGroupRef != null && sharedGroupRef.retain()) {
+               validateSharedRef(sharedGroupRef, threads);
+               return new AtomicCloseableRef<>(sharedGroupRef);
+            }
+            synchronized (this) {
+               sharedGroupRef = sharedEventLoopGroup.get();
+               if (sharedGroupRef != null && sharedGroupRef.retain()) {
+                  validateSharedRef(sharedGroupRef, threads);
+                  return new AtomicCloseableRef<>(sharedGroupRef);
+               }
+               sharedGroupRef = new SharedGroupRef(createEventLoopGroup(threads, this, new QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" + SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true)), threads);
+               sharedEventLoopGroup.set(sharedGroupRef);

Review comment:
       The group here would be shared by different connections, so I cannot see how to make the information about it clearly visible, if not by modifying concurrently the thread factory each time a new connection use it (and doesn't seem a good option because can happen that 3 Netty thread(s) would handle hundreds of physical connections)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org