You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/07/19 13:07:43 UTC

[cxf] branch master updated: CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 2244024  CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683)
2244024 is described below

commit 2244024ba6d438f44adffd56b1dc8e02cb45cf71
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sun Jul 19 09:07:34 2020 -0400

    CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683)
    
    CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory
---
 .../microprofile/client/MPRestClientCallback.java  |   6 +-
 .../client/MicroProfileClientFactoryBean.java      |   2 +-
 .../org/apache/cxf/microprofile/client/Utils.java  | 101 ++++++-
 .../client/proxy/MicroProfileClientProxyImpl.java  |   5 +
 systests/microprofile/client/async/pom.xml         |   6 +
 .../rest/client/AsyncThreadingTest.java            | 296 +++++++++++++++++++++
 6 files changed, 399 insertions(+), 17 deletions(-)

diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index 7113efe..83226db 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,7 +20,6 @@
 package org.apache.cxf.microprofile.client;
 
 import java.lang.reflect.Type;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.ws.rs.client.InvocationCallback;
@@ -29,19 +28,16 @@ import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
 import org.apache.cxf.message.Message;
 
 public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
-    private final ExecutorService executor;
-
     public MPRestClientCallback(InvocationCallback<T> handler,
                                 Message outMessage,
                                 Class<?> responseClass,
                                 Type outGenericType) {
         super(handler, responseClass, outGenericType);
-        executor = Utils.getExecutorService(outMessage);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public Future<T> createFuture() {
-        return delegate.thenApplyAsync(res -> (T)res[0], executor);
+        return delegate.thenApply(res -> (T)res[0]);
     }
 }
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
index 2b7fa7e..389c4fa 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
@@ -63,7 +63,7 @@ public class MicroProfileClientFactoryBean extends JAXRSClientFactoryBean {
         super(new MicroProfileServiceFactoryBean());
         this.configuration = configuration.getConfiguration();
         this.comparator = MicroProfileClientProviderFactory.createComparator(this);
-        this.executorService = executorService;
+        this.executorService = (executorService == null) ? Utils.defaultExecutorService() : executorService; 
         this.secConfig = secConfig;
         super.setAddress(baseUri);
         super.setServiceClass(aClass);
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
index 94ebdb1..7b1edad 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
@@ -21,34 +21,113 @@ package org.apache.cxf.microprofile.client;
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cxf.jaxrs.client.AbstractClient;
 import org.apache.cxf.jaxrs.ext.MessageContext;
-import org.apache.cxf.message.Message;
 
 public final class Utils {
 
     private Utils() {
     }
 
-    public static ExecutorService getExecutorService(Message message) {
-        ExecutorService es = message.get(ExecutorService.class);
+    public static ExecutorService getExecutorService(MessageContext mc) {
+        ExecutorService es = (ExecutorService) mc.get(AbstractClient.EXECUTOR_SERVICE_PROPERTY);
         if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
-                return ForkJoinPool.commonPool();
-            });
+            es = getCommonPool();
         }
         return es;
     }
+    
+    public static ExecutorService defaultExecutorService() {
+        return new LazyForkJoinExecutor();
+    }
+    
+    private static class LazyForkJoinExecutor implements ExecutorService {
+        @Override
+        public void execute(Runnable command) {
+            getCommonPool().execute(command);
+        }
 
-    public static ExecutorService getExecutorService(MessageContext mc) {
-        ExecutorService es = (ExecutorService) mc.get(ExecutorService.class);
-        if (es == null) {
-            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
+        @Override
+        public void shutdown() {
+            getCommonPool().shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return getCommonPool().shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return getCommonPool().isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return getCommonPool().isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return getCommonPool().awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return getCommonPool().submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return getCommonPool().submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+                TimeUnit unit) throws InterruptedException {
+            return getCommonPool().invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
+                throws InterruptedException, ExecutionException {
+            return getCommonPool().invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, 
+                TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return getCommonPool().invokeAny(tasks, timeout, unit);
+        }
+    }
+    
+    private static ExecutorService getCommonPool() {
+        if (System.getSecurityManager() != null) {
+            return AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
                 return ForkJoinPool.commonPool();
             });
+        } else {
+            return ForkJoinPool.commonPool();
         }
-        return es;
     }
 }
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 35b62bc..48e392d 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -114,6 +114,11 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
         super(new LocalClientState(baseURI, configuration.getProperties()), loader, cri,
             isRoot, inheritHeaders, varValues);
         this.interceptorWrapper = interceptorWrapper;
+        
+        if (executorService == null) {
+            throw new IllegalArgumentException("The executorService is required and must be provided");
+        }
+        
         init(executorService, configuration);
     }
 
diff --git a/systests/microprofile/client/async/pom.xml b/systests/microprofile/client/async/pom.xml
index bd42776..4b2c69a 100644
--- a/systests/microprofile/client/async/pom.xml
+++ b/systests/microprofile/client/async/pom.xml
@@ -77,6 +77,12 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.apache.geronimo.specs</groupId>
+                <artifactId>geronimo-jsonb_1.0_spec</artifactId>
+                <version>${cxf.geronimo.jsonb.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>jakarta.json</groupId>
                 <artifactId>jakarta.json-api</artifactId>
                 <version>${cxf.json.api.version}</version>
diff --git a/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java
new file mode 100644
index 0000000..17ead61
--- /dev/null
+++ b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.microprofile.rest.client;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+
+import org.apache.johnzon.jaxrs.jsonb.jaxrs.JsonbJaxrsProvider;
+import org.eclipse.microprofile.rest.client.RestClientBuilder;
+import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor;
+import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(Parameterized.class)
+public class AsyncThreadingTest {
+    private static final ThreadLocal<String> CONTEXT = new ThreadLocal<>();
+    
+    @Rule
+    public WireMockRule wireMockRule = new WireMockRule(WireMockConfiguration.options().dynamicPort());
+    
+    private final ExecutorService executorService;
+    private final String prefix;
+    private EchoResource echo;
+    
+    public AsyncThreadingTest(final ExecutorService executorService, final String prefix) {
+        this.executorService = executorService;
+        this.prefix = prefix;
+    }
+
+    @Parameters(name = "Using pool: {1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] 
+        {
+            {cachedExecutor(), "mp-async-"}, 
+            {null, "ForkJoinPool.commonPool-worker-"}
+        });
+    }
+        
+    @Before
+    public void setUp() {
+        final RestClientBuilder builder = RestClientBuilder
+            .newBuilder()
+            .register(JsonbJaxrsProvider.class)
+            .register(AsyncInvocationInterceptorFactoryImpl.class)
+            .baseUri(getBaseUri());
+        
+        if (executorService == null /* use default one */) {
+            echo = builder.build(EchoResource.class);
+        } else {
+            echo = builder.executorService(executorService).build(EchoResource.class);
+        }
+    }
+    
+    @After
+    public void tearDown() {
+        CONTEXT.remove();
+    }
+
+    @Test
+    public void testAsynchronousNotFoundCall() throws Exception {
+        wireMockRule.stubFor(get(urlEqualTo("/echo"))
+            .willReturn(aResponse()
+            .withStatus(404)));
+
+        final CompletableFuture<Echo> future = echo
+            .getAsync()
+            .toCompletableFuture()
+            .handle((r, ex) -> {
+                try {
+                    Thread.sleep(500);
+                    assertThat(Thread.currentThread().getName(), startsWith(prefix));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+
+                if (ex instanceof CompletionException) {
+                    throw (CompletionException)ex;
+                } else {
+                    return r;
+                }
+            });
+
+        // Simulate some processing pause
+        assertNull(future.getNow(null));
+        
+        final ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(5L, TimeUnit.SECONDS));
+        assertEquals(WebApplicationException.class, ex.getCause().getClass());
+    }
+
+    @Test
+    public void testAsynchronousCall() throws Exception {
+        wireMockRule.stubFor(get(urlEqualTo("/echo"))
+            .willReturn(aResponse()
+            .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+            .withBody("{ \"message\": \"echo\" }")));
+
+        final CompletableFuture<Echo> future = echo
+            .getAsync()
+            .toCompletableFuture()
+            .thenApply(s -> {
+                try {
+                    Thread.sleep(500);
+                    assertThat(Thread.currentThread().getName(), startsWith(prefix));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                
+                return s;
+            });
+        
+        assertNull(future.getNow(null));
+        
+        final Echo result = future.get(5L, TimeUnit.SECONDS);
+        assertThat(result.getMessage(), equalTo("echo"));
+    }
+
+    @Test
+    public void testAsynchronousCallAndContextPropagation() throws Exception {
+        wireMockRule.stubFor(get(urlEqualTo("/echo"))
+            .willReturn(aResponse()
+            .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+            .withBody("{ \"message\": \"echo\" }")));
+
+        CONTEXT.set("context-value");
+        
+        final CompletableFuture<Echo> future = echo
+            .getAsync()
+            .toCompletableFuture()
+            .thenApply(s -> {
+                try {
+                    Thread.sleep(500);
+                    assertThat(Thread.currentThread().getName(), startsWith(prefix));
+                    assertThat(CONTEXT.get(), equalTo("context-value"));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+
+                return s;
+            });
+
+        final Echo result = future.get(5L, TimeUnit.SECONDS);
+        assertThat(result.getMessage(), equalTo("echo"));
+    }
+
+    @Test
+    public void testAsynchronousCallMany() throws InterruptedException, ExecutionException, TimeoutException {
+        wireMockRule.stubFor(get(urlEqualTo("/echo"))
+            .willReturn(aResponse()
+            .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+            .withBody("{ \"message\": \"echo\" }")));
+
+        final Collection<CompletableFuture<Echo>> futures = new ArrayList<>();
+        for (int i = 0; i < 20; ++i) {
+            futures.add(
+                echo
+                    .getAsync()
+                    .toCompletableFuture()
+                    .thenApply(s -> {
+                        try {
+                            Thread.sleep(500);
+                            assertThat(Thread.currentThread().getName(), startsWith(prefix));
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+    
+                        return s;
+                    })
+            );
+        }
+
+        CompletableFuture
+            .allOf(futures.toArray(new CompletableFuture[0]))
+            .join();
+        
+        for (final CompletableFuture<Echo> future: futures) {
+            assertThat(future.get().getMessage(), equalTo("echo"));
+        }
+    }
+    
+    private URI getBaseUri() {
+        return URI.create("http://localhost:" + wireMockRule.port() + "/echo");
+    }
+      
+    public static class Echo {
+        private String message;
+          
+        public String getMessage() {
+            return message;
+        }
+          
+        public void setMessage(String message) {
+            this.message = message;
+        }
+    }
+      
+    @Path("/")
+    public interface EchoResource {
+        @GET
+        @Produces(MediaType.APPLICATION_JSON)
+        @Consumes(MediaType.APPLICATION_JSON)
+        CompletionStage<Echo> getAsync();
+    }
+      
+    public static class AsyncInvocationInterceptorFactoryImpl implements AsyncInvocationInterceptorFactory {
+        @Override
+        public AsyncInvocationInterceptor newInterceptor() {
+            return new AsyncInvocationInterceptorImpl();
+        }
+    }
+
+    public static class AsyncInvocationInterceptorImpl implements AsyncInvocationInterceptor {
+        private String context;
+
+        @Override
+        public void prepareContext() {
+            context = CONTEXT.get();
+        }
+
+        @Override
+        public void applyContext() {
+            CONTEXT.set(context);
+        }
+
+        @Override
+        public void removeContext() {
+            CONTEXT.remove();
+        }
+    }
+    
+    private static ExecutorService cachedExecutor() {
+        return Executors.newCachedThreadPool(new ThreadFactory() {
+            private AtomicInteger counter = new AtomicInteger();
+             
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "mp-async-" + counter.incrementAndGet());
+            }
+        });
+    }
+}