You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2021/11/05 20:46:45 UTC

[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions

franz1981 commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r743939643



##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it, got closed. If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it get closed. 
   If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);

Review comment:
       > I assume thats to try and avoid creating it?
   
   Given that we cannot rely on finalization of connection factory, I cannot pre-allocate it if there are no actual "users" ie connections. And I would like it to be correctly disposed and shutdown while every connection belonging to the connction factory got closed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I can let them to pass, but this one was to mimic the original reject handler installed for the completion single threaded executor: I admit I didn't put much thoughts on this to validate if it can be saved

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it, got closed. If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it get closed. 
   If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);

Review comment:
       > I assume thats to try and avoid creating it?
   
   Given that we cannot rely on finalization of connection factory, I cannot pre-allocate it if there are no actual "users" ie connections. And I would like it to be correctly disposed and shutdown while every connection belonging to the connction factory got closed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I can let them to pass, but this one was to mimic the original reject handler installed for the completion single threaded executor: I admit I didn't put much thoughts on this to validate if it can be saved

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       > I expect this change would mean they get could be marked as failed when they shouldnt
   
   I've implemented it intending to mimic (by inlining) the reject policy of the single threaded executor: I admit i didn't put much thoughts into this, but probably can be treated differently and it's maybe introducing a slightly different semantic, but I still don't see any harm; my expectation is that when session::shutdown is called, any already submitted completion should be handled (and that's visible in the new shutdown logic using CountDownLatch), but new submissions would be processed if sent *after* shutdown is initiated (unless part of the `shutdown` logic itself ie `ignoreSessionClosed == true`)

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       > I expect this change would mean they get could be marked as failed when they shouldnt
   
   I've implemented it intending to mimic (by inlining) the reject policy of the single threaded executor: I admit i didn't put much thoughts into this, but probably can be treated differently and it's maybe introducing a slightly different semantic, but I still don't see any harm; my expectation is that when session::shutdown is called, any already submitted completion should be handled (and that's visible in the new shutdown logic using CountDownLatch), but new submissions would be processed if sent *after* shutdown is initiated (unless part of the `shutdown` logic itself ie `ignoreSessionClosed == true`) and ignored otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org