You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/07/19 13:07:43 UTC
[cxf] branch master updated: CXF-8303: MP: Context propagation
impossible using AsyncInvocationInterceptorFactory (#683)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 2244024 CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683)
2244024 is described below
commit 2244024ba6d438f44adffd56b1dc8e02cb45cf71
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sun Jul 19 09:07:34 2020 -0400
CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683)
CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory
---
.../microprofile/client/MPRestClientCallback.java | 6 +-
.../client/MicroProfileClientFactoryBean.java | 2 +-
.../org/apache/cxf/microprofile/client/Utils.java | 101 ++++++-
.../client/proxy/MicroProfileClientProxyImpl.java | 5 +
systests/microprofile/client/async/pom.xml | 6 +
.../rest/client/AsyncThreadingTest.java | 296 +++++++++++++++++++++
6 files changed, 399 insertions(+), 17 deletions(-)
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index 7113efe..83226db 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,7 +20,6 @@
package org.apache.cxf.microprofile.client;
import java.lang.reflect.Type;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.ws.rs.client.InvocationCallback;
@@ -29,19 +28,16 @@ import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
import org.apache.cxf.message.Message;
public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
- private final ExecutorService executor;
-
public MPRestClientCallback(InvocationCallback<T> handler,
Message outMessage,
Class<?> responseClass,
Type outGenericType) {
super(handler, responseClass, outGenericType);
- executor = Utils.getExecutorService(outMessage);
}
@SuppressWarnings("unchecked")
@Override
public Future<T> createFuture() {
- return delegate.thenApplyAsync(res -> (T)res[0], executor);
+ return delegate.thenApply(res -> (T)res[0]);
}
}
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
index 2b7fa7e..389c4fa 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java
@@ -63,7 +63,7 @@ public class MicroProfileClientFactoryBean extends JAXRSClientFactoryBean {
super(new MicroProfileServiceFactoryBean());
this.configuration = configuration.getConfiguration();
this.comparator = MicroProfileClientProviderFactory.createComparator(this);
- this.executorService = executorService;
+ this.executorService = (executorService == null) ? Utils.defaultExecutorService() : executorService;
this.secConfig = secConfig;
super.setAddress(baseUri);
super.setServiceClass(aClass);
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
index 94ebdb1..7b1edad 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java
@@ -21,34 +21,113 @@ package org.apache.cxf.microprofile.client;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cxf.jaxrs.client.AbstractClient;
import org.apache.cxf.jaxrs.ext.MessageContext;
-import org.apache.cxf.message.Message;
public final class Utils {
private Utils() {
}
- public static ExecutorService getExecutorService(Message message) {
- ExecutorService es = message.get(ExecutorService.class);
+ public static ExecutorService getExecutorService(MessageContext mc) {
+ ExecutorService es = (ExecutorService) mc.get(AbstractClient.EXECUTOR_SERVICE_PROPERTY);
if (es == null) {
- es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
- return ForkJoinPool.commonPool();
- });
+ es = getCommonPool();
}
return es;
}
+
+ public static ExecutorService defaultExecutorService() {
+ return new LazyForkJoinExecutor();
+ }
+
+ private static class LazyForkJoinExecutor implements ExecutorService {
+ @Override
+ public void execute(Runnable command) {
+ getCommonPool().execute(command);
+ }
- public static ExecutorService getExecutorService(MessageContext mc) {
- ExecutorService es = (ExecutorService) mc.get(ExecutorService.class);
- if (es == null) {
- es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
+ @Override
+ public void shutdown() {
+ getCommonPool().shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return getCommonPool().shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return getCommonPool().isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return getCommonPool().isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return getCommonPool().awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return getCommonPool().submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return getCommonPool().submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return getCommonPool().submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return getCommonPool().invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit) throws InterruptedException {
+ return getCommonPool().invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return getCommonPool().invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return getCommonPool().invokeAny(tasks, timeout, unit);
+ }
+ }
+
+ private static ExecutorService getCommonPool() {
+ if (System.getSecurityManager() != null) {
+ return AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> {
return ForkJoinPool.commonPool();
});
+ } else {
+ return ForkJoinPool.commonPool();
}
- return es;
}
}
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 35b62bc..48e392d 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -114,6 +114,11 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
super(new LocalClientState(baseURI, configuration.getProperties()), loader, cri,
isRoot, inheritHeaders, varValues);
this.interceptorWrapper = interceptorWrapper;
+
+ if (executorService == null) {
+ throw new IllegalArgumentException("The executorService is required and must be provided");
+ }
+
init(executorService, configuration);
}
diff --git a/systests/microprofile/client/async/pom.xml b/systests/microprofile/client/async/pom.xml
index bd42776..4b2c69a 100644
--- a/systests/microprofile/client/async/pom.xml
+++ b/systests/microprofile/client/async/pom.xml
@@ -77,6 +77,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jsonb_1.0_spec</artifactId>
+ <version>${cxf.geronimo.jsonb.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>${cxf.json.api.version}</version>
diff --git a/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java
new file mode 100644
index 0000000..17ead61
--- /dev/null
+++ b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.microprofile.rest.client;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+
+import org.apache.johnzon.jaxrs.jsonb.jaxrs.JsonbJaxrsProvider;
+import org.eclipse.microprofile.rest.client.RestClientBuilder;
+import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor;
+import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(Parameterized.class)
+public class AsyncThreadingTest {
+ private static final ThreadLocal<String> CONTEXT = new ThreadLocal<>();
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(WireMockConfiguration.options().dynamicPort());
+
+ private final ExecutorService executorService;
+ private final String prefix;
+ private EchoResource echo;
+
+ public AsyncThreadingTest(final ExecutorService executorService, final String prefix) {
+ this.executorService = executorService;
+ this.prefix = prefix;
+ }
+
+ @Parameters(name = "Using pool: {1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]
+ {
+ {cachedExecutor(), "mp-async-"},
+ {null, "ForkJoinPool.commonPool-worker-"}
+ });
+ }
+
+ @Before
+ public void setUp() {
+ final RestClientBuilder builder = RestClientBuilder
+ .newBuilder()
+ .register(JsonbJaxrsProvider.class)
+ .register(AsyncInvocationInterceptorFactoryImpl.class)
+ .baseUri(getBaseUri());
+
+ if (executorService == null /* use default one */) {
+ echo = builder.build(EchoResource.class);
+ } else {
+ echo = builder.executorService(executorService).build(EchoResource.class);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ CONTEXT.remove();
+ }
+
+ @Test
+ public void testAsynchronousNotFoundCall() throws Exception {
+ wireMockRule.stubFor(get(urlEqualTo("/echo"))
+ .willReturn(aResponse()
+ .withStatus(404)));
+
+ final CompletableFuture<Echo> future = echo
+ .getAsync()
+ .toCompletableFuture()
+ .handle((r, ex) -> {
+ try {
+ Thread.sleep(500);
+ assertThat(Thread.currentThread().getName(), startsWith(prefix));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (ex instanceof CompletionException) {
+ throw (CompletionException)ex;
+ } else {
+ return r;
+ }
+ });
+
+ // Simulate some processing pause
+ assertNull(future.getNow(null));
+
+ final ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(5L, TimeUnit.SECONDS));
+ assertEquals(WebApplicationException.class, ex.getCause().getClass());
+ }
+
+ @Test
+ public void testAsynchronousCall() throws Exception {
+ wireMockRule.stubFor(get(urlEqualTo("/echo"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+ .withBody("{ \"message\": \"echo\" }")));
+
+ final CompletableFuture<Echo> future = echo
+ .getAsync()
+ .toCompletableFuture()
+ .thenApply(s -> {
+ try {
+ Thread.sleep(500);
+ assertThat(Thread.currentThread().getName(), startsWith(prefix));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return s;
+ });
+
+ assertNull(future.getNow(null));
+
+ final Echo result = future.get(5L, TimeUnit.SECONDS);
+ assertThat(result.getMessage(), equalTo("echo"));
+ }
+
+ @Test
+ public void testAsynchronousCallAndContextPropagation() throws Exception {
+ wireMockRule.stubFor(get(urlEqualTo("/echo"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+ .withBody("{ \"message\": \"echo\" }")));
+
+ CONTEXT.set("context-value");
+
+ final CompletableFuture<Echo> future = echo
+ .getAsync()
+ .toCompletableFuture()
+ .thenApply(s -> {
+ try {
+ Thread.sleep(500);
+ assertThat(Thread.currentThread().getName(), startsWith(prefix));
+ assertThat(CONTEXT.get(), equalTo("context-value"));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return s;
+ });
+
+ final Echo result = future.get(5L, TimeUnit.SECONDS);
+ assertThat(result.getMessage(), equalTo("echo"));
+ }
+
+ @Test
+ public void testAsynchronousCallMany() throws InterruptedException, ExecutionException, TimeoutException {
+ wireMockRule.stubFor(get(urlEqualTo("/echo"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", MediaType.APPLICATION_JSON)
+ .withBody("{ \"message\": \"echo\" }")));
+
+ final Collection<CompletableFuture<Echo>> futures = new ArrayList<>();
+ for (int i = 0; i < 20; ++i) {
+ futures.add(
+ echo
+ .getAsync()
+ .toCompletableFuture()
+ .thenApply(s -> {
+ try {
+ Thread.sleep(500);
+ assertThat(Thread.currentThread().getName(), startsWith(prefix));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return s;
+ })
+ );
+ }
+
+ CompletableFuture
+ .allOf(futures.toArray(new CompletableFuture[0]))
+ .join();
+
+ for (final CompletableFuture<Echo> future: futures) {
+ assertThat(future.get().getMessage(), equalTo("echo"));
+ }
+ }
+
+ private URI getBaseUri() {
+ return URI.create("http://localhost:" + wireMockRule.port() + "/echo");
+ }
+
+ public static class Echo {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+
+ @Path("/")
+ public interface EchoResource {
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ CompletionStage<Echo> getAsync();
+ }
+
+ public static class AsyncInvocationInterceptorFactoryImpl implements AsyncInvocationInterceptorFactory {
+ @Override
+ public AsyncInvocationInterceptor newInterceptor() {
+ return new AsyncInvocationInterceptorImpl();
+ }
+ }
+
+ public static class AsyncInvocationInterceptorImpl implements AsyncInvocationInterceptor {
+ private String context;
+
+ @Override
+ public void prepareContext() {
+ context = CONTEXT.get();
+ }
+
+ @Override
+ public void applyContext() {
+ CONTEXT.set(context);
+ }
+
+ @Override
+ public void removeContext() {
+ CONTEXT.remove();
+ }
+ }
+
+ private static ExecutorService cachedExecutor() {
+ return Executors.newCachedThreadPool(new ThreadFactory() {
+ private AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "mp-async-" + counter.incrementAndGet());
+ }
+ });
+ }
+}