You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2020/07/15 02:47:55 UTC

[GitHub] [cxf] reta opened a new pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

reta opened a new pull request #683:
URL: https://github.com/apache/cxf/pull/683


   1. Aligning MP Client implementation to use only one executor service, configured through `MicroProfileClientFactoryBean` and propagated via contextual property `EXECUTOR_SERVICE_PROPERTY`
   2. Always initialize executor service in the `MicroProfileClientFactoryBean`, using `ForkJoin` pool as default if not specified
   3. Removed async delegation from `MPRestClientCallback::createFuture` since it is always executed in the context of executor service (configured through `MicroProfileClientFactoryBean`)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] rmannibucau commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
rmannibucau commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r456239356



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       Yep, was not a good phrasing choice. Fact is getting triggers the common (var) instantiation which brings a lot of work - for nothing in some cases and that's what I'd like to avoid.
   Having a facade enables to just load a class and do a new without dependencies (static init) and speed up the startup.
   We faced it in openwebbeans and this is why we have that wrapper https://github.com/apache/openwebbeans/blob/9b04c5e906294bcad7c61f30b5c5fcc8956b2f17/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java#L954.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r455389430



##########
File path: systests/microprofile/client/async/pom.xml
##########
@@ -76,6 +76,12 @@
                 <version>${cxf.johnzon.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>

Review comment:
       Sure, done, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r456685850



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       :+1: thanks for bearing with me, the concern is clear now, fixed!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r455391971



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       Thanks for the review! This one is very tricky to make lazy. Essentially, this `executorService` sets (or unsets) the CXF's `EXECUTOR_SERVICE_PROPERTY` which basically controls how the async client invocation should be handled (it is used by MP and other clients). If we don't provide `executorService` upfront, the property won't be set and async callback (`MPRestClientCallback`) is going to be called in the context of CXF's work queue pool. The common `ForkJoinPool` is present 99.9% of the time so it is not too expensive to reference it (but lazy would be better for sure).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] rmannibucau commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
rmannibucau commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r454807411



##########
File path: systests/microprofile/client/async/pom.xml
##########
@@ -76,6 +76,12 @@
                 <version>${cxf.johnzon.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>

Review comment:
       think you can use JsonbJaxrsProvider from johnzon-jsonb instead of importing this one which does not use jsonb, would be more consistent with MP probably?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r455391971



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       Thanks for the review! This one is very tricky to make lazy. Essentially, this `executorService` sets (or unsets) the CXF's `EXECUTOR_SERVICE_PROPERTY` which basically controls the how the async client invocation should be handled (it is used by MP and other clients). If we don't provide `executorService` upfront, the property won't be set and async callback (`MPRestClientCallback`) is going to be called in the context of CXF's work queue pool. The common `ForkJoinPool` is present 99.9% of the time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] rmannibucau commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
rmannibucau commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r456759485



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
##########
@@ -21,34 +21,109 @@
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cxf.jaxrs.client.AbstractClient;
 import org.apache.cxf.jaxrs.ext.MessageContext;
-import org.apache.cxf.message.Message;
 
 public final class Utils {
 
     private Utils() {
     }
 
-    public static ExecutorService getExecutorService(Message message) {
-        ExecutorService es = message.get(ExecutorService.class);
+    public static ExecutorService getExecutorService(MessageContext mc) {
+        ExecutorService es = (ExecutorService) mc.get(AbstractClient.EXECUTOR_SERVICE_PROPERTY);
         if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
-                return ForkJoinPool.commonPool();
-            });
+            es = getCommonPool();
         }
         return es;
     }
+    
+    public static ExecutorService defaultExecutorService() {
+        return new LazyForkJoinExecutor();
+    }
+    
+    private static class LazyForkJoinExecutor implements ExecutorService {
+        @Override
+        public void execute(Runnable command) {
+            getCommonPool().execute(command);
+        }
 
-    public static ExecutorService getExecutorService(MessageContext mc) {
-        ExecutorService es = (ExecutorService) mc.get(ExecutorService.class);
-        if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
-                return ForkJoinPool.commonPool();
-            });
+        @Override
+        public void shutdown() {
+            getCommonPool().shutdown();
         }
-        return es;
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return getCommonPool().shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return getCommonPool().isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return getCommonPool().isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return getCommonPool().awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return getCommonPool().submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+                TimeUnit unit) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
+                throws InterruptedException, ExecutionException {
+            return getCommonPool().invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, 
+                TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return getCommonPool().invokeAny(tasks, timeout, unit);
+        }
+    }
+    
+    private static ExecutorService getCommonPool() {
+        return AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {

Review comment:
       Guess you want to bypass the dopriviledge when there is no security manager
   Otherwise looks good to me now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta merged pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta merged pull request #683:
URL: https://github.com/apache/cxf/pull/683


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] rmannibucau commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
rmannibucau commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r455525270



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       es = new LazyExecutor();
   
   lazyexecutor 100% delegates to common one, it just enables to not create threads until it is done so the pool is async, not the facing instance ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r456118032



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       Sorry, this part I didn't get. With respect to threads (aka workers), `ForkJoinPool` is 100% lazy: it does not preallocate threads upfront if there is no work to be done, the `ForkJoinPool`'s threads are created only when work is scheduled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r456791803



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
##########
@@ -21,34 +21,109 @@
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cxf.jaxrs.client.AbstractClient;
 import org.apache.cxf.jaxrs.ext.MessageContext;
-import org.apache.cxf.message.Message;
 
 public final class Utils {
 
     private Utils() {
     }
 
-    public static ExecutorService getExecutorService(Message message) {
-        ExecutorService es = message.get(ExecutorService.class);
+    public static ExecutorService getExecutorService(MessageContext mc) {
+        ExecutorService es = (ExecutorService) mc.get(AbstractClient.EXECUTOR_SERVICE_PROPERTY);
         if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
-                return ForkJoinPool.commonPool();
-            });
+            es = getCommonPool();
         }
         return es;
     }
+    
+    public static ExecutorService defaultExecutorService() {
+        return new LazyForkJoinExecutor();
+    }
+    
+    private static class LazyForkJoinExecutor implements ExecutorService {
+        @Override
+        public void execute(Runnable command) {
+            getCommonPool().execute(command);
+        }
 
-    public static ExecutorService getExecutorService(MessageContext mc) {
-        ExecutorService es = (ExecutorService) mc.get(ExecutorService.class);
-        if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
-                return ForkJoinPool.commonPool();
-            });
+        @Override
+        public void shutdown() {
+            getCommonPool().shutdown();
         }
-        return es;
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return getCommonPool().shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return getCommonPool().isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return getCommonPool().isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return getCommonPool().awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return getCommonPool().submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+                TimeUnit unit) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
+                throws InterruptedException, ExecutionException {
+            return getCommonPool().invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, 
+                TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return getCommonPool().invokeAny(tasks, timeout, unit);
+        }
+    }
+    
+    private static ExecutorService getCommonPool() {
+        return AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {

Review comment:
       :+1: done, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] rmannibucau commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
rmannibucau commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r454807050



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       maybe make it lazy to not create it (forkjoin common pool) if the app does not use it and the client for its lifecycle?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on pull request #683:
URL: https://github.com/apache/cxf/pull/683#issuecomment-659714742


   Thanks a lot, @andymc12 !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [cxf] reta commented on a change in pull request #683: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory

Posted by GitBox <gi...@apache.org>.
reta commented on a change in pull request #683:
URL: https://github.com/apache/cxf/pull/683#discussion_r455391971



##########
File path: rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
##########
@@ -63,7 +66,7 @@ public MicroProfileClientFactoryBean(MicroProfileClientConfigurableImpl<RestClie
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? defaultExecutorService() : executorService; 

Review comment:
       Thanks for the review! This one is very tricky to make lazy. Essentially, this `executorService` sets (or unsets) the CXF's `EXECUTOR_SERVICE_PROPERTY` which basically controls how the async client invocation should be handled (it is used by MP and other clients). If we don't provide `executorService` upfront, the property won't be set and async callback (`MPRestClientCallback`) is going to be called in the context of CXF's work queue pool. The common `ForkJoinPool` is present 99.9% of the time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org