You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2021/11/05 16:18:11 UTC

[GitHub] [qpid-jms] gemmellr commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions

gemmellr commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r740906037



##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -151,6 +155,7 @@ public void run() {
 
         this.connectionInfo = connectionInfo;
         this.connectionInfo.setConnection(this);
+        this.completionExecutorService = this.connectionInfo.getCompletionExecutorServiceFactory().map(Supplier::get).orElse(null);

Review comment:
       Calling the field e.g 'sharedCompletionExecutorHolder' would make its use clearer.
   
   Similarly the getCompletionExecutorServiceFactory feels a bit lengthy, and not reflective that it only covers the shared one, e.g getSharedCompletionExecutorFactory.
   
   Actually, it feels like most of this line could be done up front, either in the ConnectionFactory or inside the ConnectionInfo, such that this line was a simple getSharedCompletionExecutorHolder that either returns null (as it will by default) or not.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       I'd go with SharedCompletionThreads to emphasize its shared, given its both same and cross-connection sharing.
   
   EDIT: actually, is it? I'm not seeing where it would actually share? A new SharedDisposable looks to be made afresh each time, and has no statics to share across anything, which would seem to mean its just a bigger per-connection pool.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I dont think the session closed check thing works. The closed boolean is set true as soon as session shutdown starts, but its possible for successfull async completions to arrive before the closure completes, simply because they are async and so is the session close, so they may be in flight and arrive before the session closure. I expect this change would mean they get could be marked as failed when they shouldnt, as the completion task would get dropped instead of running and then the fallback 'session closed, fail all outstanding completions' task would run and say they failed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);

Review comment:
       Newline before isEmpty, its on to a different unit of the work really.
   
   I think it would be more readable to either...combine the two 'I can just stop now' checks into a single if(empty || CAS) { return }...or instead to gate the execute call inside the if(CAS) the same way it is done in the asyncProcessCompletion method below.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -1195,23 +1243,27 @@ Executor getDispatcherExecutor() {
     }
 
     private ExecutorService getCompletionExecutor() {
-        ThreadPoolExecutor exec = completionExcecutor;
+        ExecutorService exec = completionExecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
-                exec = completionExcecutor;
+                exec = completionExecutor;
                 if (exec == null) {
-                    exec = createExecutor("completion dispatcher", completionThread);
-
+                    if (connection.getCompletionExecutorService() != null) {
+                        exec = connection.getCompletionExecutorService().ref();
+                    } else {
+                        exec = createExecutor("completion dispatcher", null);
+                    }
                     // Ensure work thread is fully up before allowing other threads
                     // to attempt to execute on this instance.
-                    Future<?> starter = exec.submit(() -> {});
+                    Future<?> starter = exec.submit(() -> {
+                    });

Review comment:
       Leave it on a single line, its clearer.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -106,7 +109,7 @@
     private final AtomicBoolean started = new AtomicBoolean();
     private final AtomicReference<Exception> failureCause = new AtomicReference<>();
     private final JmsConnectionInfo connectionInfo;
-    private final ThreadPoolExecutor executor;
+    protected final ThreadPoolExecutor executor;

Review comment:
       Why does this need exposed?

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);

Review comment:
       It doesnt feel like we need a supplier of a holder of an executor. Just a holder.
   
   I assume thats to try and avoid creating it? Since it seems somewhat implicit that if you set the option to get this shared executor behaviour, then you actually want it and thus can be expected to need it, its not really clear to me its worth the extra mechanics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org