You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2019/07/30 14:07:41 UTC

[cxf] branch master updated: [CXF-8080] Ensure stages from async methods are completed

This is an automated email from the ASF dual-hosted git repository.

amccright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new a0863e6  [CXF-8080] Ensure stages from async methods are completed
     new 6bff3cb  Merge pull request #570 from andymc12/8080-asyncStages
a0863e6 is described below

commit a0863e63b2c874307b2a8401a2dccbd95b724a3e
Author: Andy McCright <j....@gmail.com>
AuthorDate: Mon Jul 29 09:50:52 2019 -0500

    [CXF-8080] Ensure stages from async methods are completed
---
 .../apache/cxf/jaxrs/client/ClientProxyImpl.java   |  3 +-
 rt/rs/microprofile-client/pom.xml                  |  6 ++
 .../microprofile/client/MPRestClientCallback.java  | 86 ++++++++--------------
 .../client/proxy/MicroProfileClientProxyImpl.java  |  3 +-
 .../apache/cxf/microprofile/client/AsyncTest.java  | 56 ++++++++++++++
 .../cxf/microprofile/client/mock/AsyncClient.java  | 35 +++++++++
 6 files changed, 133 insertions(+), 56 deletions(-)

diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
index 903d5a0..f857593 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
@@ -949,7 +949,7 @@ public class ClientProxyImpl extends AbstractClient implements
                                    InvocationCallback<Object> asyncCallback) {
         outMessage.getExchange().setSynchronous(false);
         setAsyncMessageObserverIfNeeded(outMessage.getExchange());
-        JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback,
+        JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback, outMessage,
             ori.getMethodToInvoke().getReturnType(), ori.getMethodToInvoke().getGenericReturnType());
         outMessage.getExchange().put(JaxrsClientCallback.class, cb);
         doRunInterceptorChain(outMessage);
@@ -958,6 +958,7 @@ public class ClientProxyImpl extends AbstractClient implements
     }
 
     protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+                                                            Message outMessage,
                                                             Class<?> responseClass,
                                                             Type outGenericType) {
         return new JaxrsClientCallback<>(asyncCallback, responseClass, outGenericType);
diff --git a/rt/rs/microprofile-client/pom.xml b/rt/rs/microprofile-client/pom.xml
index a5133f7..a5080a7 100644
--- a/rt/rs/microprofile-client/pom.xml
+++ b/rt/rs/microprofile-client/pom.xml
@@ -173,6 +173,12 @@
           <version>${cxf.commons-jcs-jcache.version}</version>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>com.squareup.okhttp3</groupId>
+          <artifactId>mockwebserver</artifactId>
+          <version>4.0.1</version>
+          <scope>test</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a9c8857..af61061 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,81 +20,59 @@
 package org.apache.cxf.microprofile.client;
 
 import java.lang.reflect.Type;
-import java.util.Map;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.InvocationCallback;
 
 import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
+import org.apache.cxf.message.Message;
 
 public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
+    private final ExecutorService executor;
 
     public MPRestClientCallback(InvocationCallback<T> handler,
+                                Message outMessage,
                                 Class<?> responseClass,
                                 Type outGenericType) {
         super(handler, responseClass, outGenericType);
+        ExecutorService es = outMessage.get(ExecutorService.class);
+        executor = es != null ? es : ForkJoinPool.commonPool();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public Future<T> createFuture() {
-        return new MPRestClientResponseFuture<T>(this);
-    }
-
-    static class MPRestClientResponseFuture<T> extends CompletableFuture<T> implements Future<T> {
-        MPRestClientCallback<T> callback;
-        MPRestClientResponseFuture(MPRestClientCallback<T> cb) {
-            callback = cb;
-        }
-
-        public Map<String, Object> getContext() {
-            try {
-                return callback.getResponseContext();
-            } catch (Exception ex) {
-                return null;
-            }
-        }
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            return callback.cancel(mayInterruptIfRunning);
-        }
-
-        public T get() throws InterruptedException, ExecutionException {
-            try {
-                return getObject(callback.get()[0]);
-            } catch (InterruptedException ex) {
-                InvocationCallback<T> handler = callback.getHandler();
-                if (handler != null) {
-                    handler.failed(ex);
+        return (Future<T>)CompletableFuture.supplyAsync(() -> {
+            synchronized (this) {
+                if (!isDone()) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new CompletionException(e);
+                    }
                 }
-                throw ex;
             }
-        }
-        public T get(long timeout, TimeUnit unit) throws InterruptedException,
-            ExecutionException, TimeoutException {
+            if (exception != null) {
+                throw new CompletionException(exception);
+            }
+            if (isCancelled()) {
+                throw new CancellationException();
+            }
+            if (!isDone()) {
+                throw new IllegalStateException(
+                    "CompletionStage has been notified, indicating completion, but is not completed.");
+            }
             try {
-                return getObject(callback.get(timeout, unit)[0]);
-            } catch (InterruptedException ex) {
-                InvocationCallback<T> handler = callback.getHandler();
-                if (handler != null) {
-                    handler.failed(ex);
-                }
-                throw ex;
+                return get()[0];
+            } catch (InterruptedException | ExecutionException e) {
+                throw new CompletionException(e);
             }
-        }
-
-        @SuppressWarnings("unchecked")
-        private T getObject(Object object) {
-            return (T)object;
-        }
-
-        public boolean isCancelled() {
-            return callback.isCancelled();
-        }
-        public boolean isDone() {
-            return callback.isDone();
-        }
+        }, executor);
     }
 }
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 99f5bee..aa5a771 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -165,9 +165,10 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
 
     @Override
     protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+                                                            Message outMessage,
                                                             Class<?> responseClass,
                                                             Type outGenericType) {
-        return new MPRestClientCallback<Object>(asyncCallback, responseClass, outGenericType);
+        return new MPRestClientCallback<Object>(asyncCallback, outMessage, responseClass, outGenericType);
     }
 
     @Override
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java
new file mode 100644
index 0000000..1cd9104
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/AsyncTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.microprofile.client;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.cxf.microprofile.client.mock.AsyncClient;
+import org.eclipse.microprofile.rest.client.RestClientBuilder;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AsyncTest {
+
+    @Test
+    public void testAsyncClient() throws Exception {
+        MockWebServer mockWebServer = new MockWebServer();
+        URI uri = mockWebServer.url("/").uri();
+        AsyncClient client = RestClientBuilder.newBuilder()
+                                              .baseUri(uri)
+                                              .connectTimeout(5, TimeUnit.SECONDS)
+                                              .readTimeout(5, TimeUnit.SECONDS)
+                                              .build(AsyncClient.class);
+        assertNotNull(client);
+
+        mockWebServer.enqueue(new MockResponse().setBody("Hello"));
+        mockWebServer.enqueue(new MockResponse().setBody("World"));
+
+        String combined = client.get().thenCombine(client.get(), (a, b) -> {
+            return a + " " + b;
+        }).toCompletableFuture().get(10, TimeUnit.SECONDS);
+
+        assertTrue("Hello World".equals(combined) || "World Hello".equals(combined));
+    }
+}
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java
new file mode 100644
index 0000000..9b48cb2
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/mock/AsyncClient.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.microprofile.client.mock;
+
+import java.util.concurrent.CompletionStage;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+@Path("/")
+@Produces("text/plain")
+@Consumes("text/plain")
+public interface AsyncClient {
+
+    @GET
+    CompletionStage<String> get();
+}