You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2019/07/30 14:07:41 UTC
[cxf] branch master updated: [CXF-8080] Ensure stages from async
methods are completed
This is an automated email from the ASF dual-hosted git repository.
amccright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new a0863e6 [CXF-8080] Ensure stages from async methods are completed
new 6bff3cb Merge pull request #570 from andymc12/8080-asyncStages
a0863e6 is described below
commit a0863e63b2c874307b2a8401a2dccbd95b724a3e
Author: Andy McCright <j....@gmail.com>
AuthorDate: Mon Jul 29 09:50:52 2019 -0500
[CXF-8080] Ensure stages from async methods are completed
---
.../apache/cxf/jaxrs/client/ClientProxyImpl.java | 3 +-
rt/rs/microprofile-client/pom.xml | 6 ++
.../microprofile/client/MPRestClientCallback.java | 86 ++++++++--------------
.../client/proxy/MicroProfileClientProxyImpl.java | 3 +-
.../apache/cxf/microprofile/client/AsyncTest.java | 56 ++++++++++++++
.../cxf/microprofile/client/mock/AsyncClient.java | 35 +++++++++
6 files changed, 133 insertions(+), 56 deletions(-)
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
index 903d5a0..f857593 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
@@ -949,7 +949,7 @@ public class ClientProxyImpl extends AbstractClient implements
InvocationCallback<Object> asyncCallback) {
outMessage.getExchange().setSynchronous(false);
setAsyncMessageObserverIfNeeded(outMessage.getExchange());
- JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback,
+ JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback, outMessage,
ori.getMethodToInvoke().getReturnType(), ori.getMethodToInvoke().getGenericReturnType());
outMessage.getExchange().put(JaxrsClientCallback.class, cb);
doRunInterceptorChain(outMessage);
@@ -958,6 +958,7 @@ public class ClientProxyImpl extends AbstractClient implements
}
protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
return new JaxrsClientCallback<>(asyncCallback, responseClass, outGenericType);
diff --git a/rt/rs/microprofile-client/pom.xml b/rt/rs/microprofile-client/pom.xml
index a5133f7..a5080a7 100644
--- a/rt/rs/microprofile-client/pom.xml
+++ b/rt/rs/microprofile-client/pom.xml
@@ -173,6 +173,12 @@
<version>${cxf.commons-jcs-jcache.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>4.0.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a9c8857..af61061 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,81 +20,59 @@
package org.apache.cxf.microprofile.client;
import java.lang.reflect.Type;
-import java.util.Map;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.InvocationCallback;
import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
+import org.apache.cxf.message.Message;
public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
+ private final ExecutorService executor;
public MPRestClientCallback(InvocationCallback<T> handler,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
super(handler, responseClass, outGenericType);
+ ExecutorService es = outMessage.get(ExecutorService.class);
+ executor = es != null ? es : ForkJoinPool.commonPool();
}
+ @SuppressWarnings("unchecked")
@Override
public Future<T> createFuture() {
- return new MPRestClientResponseFuture<T>(this);
- }
-
- static class MPRestClientResponseFuture<T> extends CompletableFuture<T> implements Future<T> {
- MPRestClientCallback<T> callback;
- MPRestClientResponseFuture(MPRestClientCallback<T> cb) {
- callback = cb;
- }
-
- public Map<String, Object> getContext() {
- try {
- return callback.getResponseContext();
- } catch (Exception ex) {
- return null;
- }
- }
- public boolean cancel(boolean mayInterruptIfRunning) {
- return callback.cancel(mayInterruptIfRunning);
- }
-
- public T get() throws InterruptedException, ExecutionException {
- try {
- return getObject(callback.get()[0]);
- } catch (InterruptedException ex) {
- InvocationCallback<T> handler = callback.getHandler();
- if (handler != null) {
- handler.failed(ex);
+ return (Future<T>)CompletableFuture.supplyAsync(() -> {
+ synchronized (this) {
+ if (!isDone()) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new CompletionException(e);
+ }
}
- throw ex;
}
- }
- public T get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
+ if (exception != null) {
+ throw new CompletionException(exception);
+ }
+ if (isCancelled()) {
+ throw new CancellationException();
+ }
+ if (!isDone()) {
+ throw new IllegalStateException(
+ "CompletionStage has been notified, indicating completion, but is not completed.");
+ }
try {
- return getObject(callback.get(timeout, unit)[0]);
- } catch (InterruptedException ex) {
- InvocationCallback<T> handler = callback.getHandler();
- if (handler != null) {
- handler.failed(ex);
- }
- throw ex;
+ return get()[0];
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CompletionException(e);
}
- }
-
- @SuppressWarnings("unchecked")
- private T getObject(Object object) {
- return (T)object;
- }
-
- public boolean isCancelled() {
- return callback.isCancelled();
- }
- public boolean isDone() {
- return callback.isDone();
- }
+ }, executor);
}
}
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 99f5bee..aa5a771 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -165,9 +165,10 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
@Override
protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
- return new MPRestClientCallback<Object>(asyncCallback, responseClass, outGenericType);
+ return new MPRestClientCallback<Object>(asyncCallback, outMessage, responseClass, outGenericType);
}
@Override
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java
new file mode 100644
index 0000000..1cd9104
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.microprofile.client;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.cxf.microprofile.client.mock.AsyncClient;
+import org.eclipse.microprofile.rest.client.RestClientBuilder;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AsyncTest {
+
+ @Test
+ public void testAsyncClient() throws Exception {
+ MockWebServer mockWebServer = new MockWebServer();
+ URI uri = mockWebServer.url("/").uri();
+ AsyncClient client = RestClientBuilder.newBuilder()
+ .baseUri(uri)
+ .connectTimeout(5, TimeUnit.SECONDS)
+ .readTimeout(5, TimeUnit.SECONDS)
+ .build(AsyncClient.class);
+ assertNotNull(client);
+
+ mockWebServer.enqueue(new MockResponse().setBody("Hello"));
+ mockWebServer.enqueue(new MockResponse().setBody("World"));
+
+ String combined = client.get().thenCombine(client.get(), (a, b) -> {
+ return a + " " + b;
+ }).toCompletableFuture().get(10, TimeUnit.SECONDS);
+
+ assertTrue("Hello World".equals(combined) || "World Hello".equals(combined));
+ }
+}
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java
new file mode 100644
index 0000000..9b48cb2
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.microprofile.client.mock;
+
+import java.util.concurrent.CompletionStage;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+@Path("/")
+@Produces("text/plain")
+@Consumes("text/plain")
+public interface AsyncClient {
+
+ @GET
+ CompletionStage<String> get();
+}