You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2018/09/04 14:17:34 UTC

aries-jax-rs-whiteboard git commit: Add support for OSGi Promises as natively asynchronous return types from resource methods

Repository: aries-jax-rs-whiteboard
Updated Branches:
  refs/heads/master 1eb36e0f6 -> 81643717c


Add support for OSGi Promises as natively asynchronous return types from resource methods


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/81643717
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/81643717
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/81643717

Branch: refs/heads/master
Commit: 81643717cac01b8c5ec4200a57b87cde04c9ce09
Parents: 1eb36e0
Author: Tim Ward <ti...@apache.org>
Authored: Tue Sep 4 15:15:34 2018 +0100
Committer: Tim Ward <ti...@apache.org>
Committed: Tue Sep 4 15:15:34 2018 +0100

----------------------------------------------------------------------
 jax-rs.itests/itest.bndrun                      |  3 +-
 jax-rs.itests/src/main/java/test/JaxrsTest.java | 92 +++++++++++++++++++-
 .../main/java/test/types/TestAsyncResource.java | 60 ++++++++++++-
 jax-rs.whiteboard/pom.xml                       |  5 ++
 .../cxf/CxfJaxrsServiceRegistrator.java         |  6 +-
 .../internal/cxf/PromiseAwareJAXRSInvoker.java  | 81 +++++++++++++++++
 6 files changed, 240 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/itest.bndrun
----------------------------------------------------------------------
diff --git a/jax-rs.itests/itest.bndrun b/jax-rs.itests/itest.bndrun
index 3727a5d..848ecec 100644
--- a/jax-rs.itests/itest.bndrun
+++ b/jax-rs.itests/itest.bndrun
@@ -61,6 +61,7 @@
 	org.osgi.util.function;version='[1.1.0,1.1.1)',\
 	org.osgi.util.promise;version='[1.1.0,1.1.1)',\
 	osgi.enroute.hamcrest.wrapper;version='[1.3.0,1.3.1)',\
-	osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)'
+	osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)',\
+	org.apache.felix.http.api;version='[3.0.0,3.0.1)'
 
 -include: -personal.bnd

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/src/main/java/test/JaxrsTest.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/JaxrsTest.java b/jax-rs.itests/src/main/java/test/JaxrsTest.java
index b4f412d..c834278 100644
--- a/jax-rs.itests/src/main/java/test/JaxrsTest.java
+++ b/jax-rs.itests/src/main/java/test/JaxrsTest.java
@@ -1087,7 +1087,7 @@ public class JaxrsTest extends TestHelper {
         throws ExecutionException, InterruptedException {
 
         WebTarget webTarget =
-            createDefaultTarget().path("whiteboard").path("async").
+            createDefaultTarget().path("whiteboard").path("async").path("suspended").
                 path("HelloAsync");
 
         AtomicBoolean pre = new AtomicBoolean();
@@ -1127,12 +1127,100 @@ public class JaxrsTest extends TestHelper {
     }
 
     @Test
+    public void testAsyncResourceCompletionStage()
+            throws ExecutionException, InterruptedException {
+        
+        WebTarget webTarget =
+                createDefaultTarget().path("whiteboard").path("async").path("completionstage").
+                path("HelloAsync");
+        
+        AtomicBoolean pre = new AtomicBoolean();
+        AtomicBoolean post = new AtomicBoolean();
+        
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        
+        registerAddon(
+                new TestAsyncResource(
+                        () -> pre.set(true),
+                        () -> {
+                            post.set(true);
+                            
+                            countDownLatch.countDown();
+                        }));
+        
+        Future<String> future = webTarget.request().async().get(
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String s) {
+                        assertTrue(pre.get());
+                    }
+                    
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
+                    }
+                });
+        
+        String result = future.get();
+        
+        countDownLatch.await(1, TimeUnit.MINUTES);
+        
+        assertTrue(post.get());
+        
+        assertEquals("This should say HelloAsync", "HelloAsync", result);
+    }
+
+    @Test
+    public void testAsyncResourcePromise()
+            throws ExecutionException, InterruptedException {
+        
+        WebTarget webTarget =
+                createDefaultTarget().path("whiteboard").path("async").path("promise").
+                path("HelloAsync");
+        
+        AtomicBoolean pre = new AtomicBoolean();
+        AtomicBoolean post = new AtomicBoolean();
+        
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        
+        registerAddon(
+                new TestAsyncResource(
+                        () -> pre.set(true),
+                        () -> {
+                            post.set(true);
+                            
+                            countDownLatch.countDown();
+                        }));
+        
+        Future<String> future = webTarget.request().async().get(
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String s) {
+                        assertTrue(pre.get());
+                    }
+                    
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
+                    }
+                });
+        
+        String result = future.get();
+        
+        countDownLatch.await(1, TimeUnit.MINUTES);
+        
+        assertTrue(post.get());
+        
+        assertEquals("This should say HelloAsync", "HelloAsync", result);
+    }
+
+    @Test
     public void testAsyncResourceClientWithPromises()
         throws ExecutionException, InterruptedException,
         InvocationTargetException {
 
         WebTarget webTarget =
-            createDefaultTarget().path("whiteboard").path("async").
+            createDefaultTarget().path("whiteboard").path("async").path("promise").
                 path("HelloAsync");
 
         AtomicBoolean pre = new AtomicBoolean();

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java b/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
index 4297aee..abe152f 100644
--- a/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
+++ b/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
@@ -17,6 +17,9 @@
 
 package test.types;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -25,6 +28,9 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
 @Path("whiteboard/async")
 public class TestAsyncResource {
 
@@ -37,7 +43,7 @@ public class TestAsyncResource {
     }
 
     @GET
-    @Path("{name}")
+    @Path("suspended/{name}")
     @Produces(MediaType.TEXT_PLAIN)
     public void echo(@Suspended AsyncResponse async,
                      @PathParam("name") String value) {
@@ -58,4 +64,56 @@ public class TestAsyncResource {
             }
         }).start();
     }
+
+    @GET
+    @Path("completionstage/{name}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public CompletionStage<String> echoCompletionStage(@PathParam("name") String value) {
+        
+        CompletableFuture<String> cf = new CompletableFuture<>();
+        
+        new Thread(() -> {
+            try {
+                try {
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                    preResume.run();
+                    cf.completeExceptionally(e);
+                    return;
+                }
+                preResume.run();
+                cf.complete(value);
+            } finally {
+                postResume.run();
+            }
+        }).start();
+        
+        return cf;
+    }
+
+    @GET
+    @Path("promise/{name}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public Promise<String> echoPromise(@PathParam("name") String value) {
+        
+        Deferred<String> d = new Deferred<>();
+        
+        new Thread(() -> {
+            try {
+                try {
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                    preResume.run();
+                    d.fail(e);
+                    return;
+                }
+                preResume.run();
+                d.resolve(value);
+            } finally {
+                postResume.run();
+            }
+        }).start();
+        
+        return d.getPromise();
+    }
 }

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/pom.xml
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/pom.xml b/jax-rs.whiteboard/pom.xml
index 369a2e9..b627704 100644
--- a/jax-rs.whiteboard/pom.xml
+++ b/jax-rs.whiteboard/pom.xml
@@ -125,6 +125,11 @@
             <version>1.1.0</version>
         </dependency>
         <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.util.function</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.2</version>

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
index b6a5687..a70fada 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
@@ -18,8 +18,8 @@
 package org.apache.aries.jax.rs.whiteboard.internal.cxf;
 
 import static java.util.stream.Collectors.toMap;
-import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalize;
 import static org.apache.aries.jax.rs.whiteboard.internal.Whiteboard.SUPPORTED_EXTENSION_INTERFACES;
+import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalize;
 import static org.apache.cxf.jaxrs.provider.ProviderFactory.DEFAULT_FILTER_NAME_BINDING;
 
 import java.util.ArrayList;
@@ -40,9 +40,9 @@ import javax.ws.rs.core.Feature;
 import javax.ws.rs.core.FeatureContext;
 import javax.ws.rs.ext.RuntimeDelegate;
 
+import org.apache.aries.component.dsl.CachingServiceReference;
 import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceReferenceResourceProvider;
 import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
-import org.apache.aries.component.dsl.CachingServiceReference;
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.util.ClassHelper;
 import org.apache.cxf.endpoint.Server;
@@ -57,7 +57,6 @@ import org.apache.cxf.jaxrs.provider.ServerConfigurableFactory;
 import org.apache.cxf.jaxrs.sse.SseContextProvider;
 import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
 import org.apache.cxf.jaxrs.utils.AnnotationUtils;
-import org.osgi.framework.ServiceObjects;
 
 public class CxfJaxrsServiceRegistrator {
 
@@ -212,6 +211,7 @@ public class CxfJaxrsServiceRegistrator {
         _jaxRsServerFactoryBean = createEndpoint(
             application, JAXRSServerFactoryBean.class);
 
+        _jaxRsServerFactoryBean.setInvoker(new PromiseAwareJAXRSInvoker());
         _jaxRsServerFactoryBean.setBus(_bus);
 
         _bus.setExtension(

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
new file mode 100644
index 0000000..91c091c
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf;
+
+import java.util.Arrays;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+import org.osgi.util.promise.Promise;
+
+public class PromiseAwareJAXRSInvoker extends JAXRSInvoker {
+    
+    /**
+     * OSGi promises are a great way to do asynchronous work, and should be handled
+     * natively just like a CompletionStage
+     */
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        
+        // Fast path - do they share our view of the Promise
+        if (result instanceof Promise) {
+            return handlePromise(inMessage, (Promise<?>) result);
+        } 
+        
+        // Slower check, is it a Promise?
+        Class<?> clazz = result.getClass();
+        if(Arrays.stream(clazz.getInterfaces())
+            .map(Class::getName)
+            .anyMatch(n -> "org.osgi.util.promise.Promise".equals(n))) {
+            
+            return handlePromiseFromAnotherClassSpace(inMessage, result, clazz);
+        }
+        
+        return super.checkFutureResponse(inMessage, result);
+    }
+
+    private AsyncResponseImpl handlePromise(Message inMessage, final Promise<?> promise) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        promise.onSuccess(asyncResponse::resume)
+               .onFailure(asyncResponse::resume);
+        return asyncResponse;
+    }
+
+    private AsyncResponseImpl handlePromiseFromAnotherClassSpace(Message inMessage, Object result, Class<?> clazz) {
+        // It's a promise, but from a different class space. Use reflection to 
+        // register a callback with the promise
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        try {
+            clazz.getMethod("onResolve", Runnable.class).invoke(result, (Runnable) () -> {
+                    try {
+                        Object failure = clazz.getMethod("getFailure").invoke(result);
+                        
+                        if(failure != null) {
+                            asyncResponse.resume((Throwable) failure);
+                        } else {
+                            asyncResponse.resume(clazz.getMethod("getValue").invoke(result));
+                        }
+                    } catch (Exception e) {
+                        asyncResponse.resume(e);
+                    }
+                });
+        } catch (Exception e) {
+            asyncResponse.resume(e);
+        }
+        return asyncResponse;
+    }
+}