You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2018/09/04 14:17:34 UTC
aries-jax-rs-whiteboard git commit: Add support for OSGi Promises as
natively asynchronous return types from resource methods
Repository: aries-jax-rs-whiteboard
Updated Branches:
refs/heads/master 1eb36e0f6 -> 81643717c
Add support for OSGi Promises as natively asynchronous return types from resource methods
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/81643717
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/81643717
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/81643717
Branch: refs/heads/master
Commit: 81643717cac01b8c5ec4200a57b87cde04c9ce09
Parents: 1eb36e0
Author: Tim Ward <ti...@apache.org>
Authored: Tue Sep 4 15:15:34 2018 +0100
Committer: Tim Ward <ti...@apache.org>
Committed: Tue Sep 4 15:15:34 2018 +0100
----------------------------------------------------------------------
jax-rs.itests/itest.bndrun | 3 +-
jax-rs.itests/src/main/java/test/JaxrsTest.java | 92 +++++++++++++++++++-
.../main/java/test/types/TestAsyncResource.java | 60 ++++++++++++-
jax-rs.whiteboard/pom.xml | 5 ++
.../cxf/CxfJaxrsServiceRegistrator.java | 6 +-
.../internal/cxf/PromiseAwareJAXRSInvoker.java | 81 +++++++++++++++++
6 files changed, 240 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/itest.bndrun
----------------------------------------------------------------------
diff --git a/jax-rs.itests/itest.bndrun b/jax-rs.itests/itest.bndrun
index 3727a5d..848ecec 100644
--- a/jax-rs.itests/itest.bndrun
+++ b/jax-rs.itests/itest.bndrun
@@ -61,6 +61,7 @@
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.0,1.1.1)',\
osgi.enroute.hamcrest.wrapper;version='[1.3.0,1.3.1)',\
- osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)'
+ osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)',\
+ org.apache.felix.http.api;version='[3.0.0,3.0.1)'
-include: -personal.bnd
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/src/main/java/test/JaxrsTest.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/JaxrsTest.java b/jax-rs.itests/src/main/java/test/JaxrsTest.java
index b4f412d..c834278 100644
--- a/jax-rs.itests/src/main/java/test/JaxrsTest.java
+++ b/jax-rs.itests/src/main/java/test/JaxrsTest.java
@@ -1087,7 +1087,7 @@ public class JaxrsTest extends TestHelper {
throws ExecutionException, InterruptedException {
WebTarget webTarget =
- createDefaultTarget().path("whiteboard").path("async").
+ createDefaultTarget().path("whiteboard").path("async").path("suspended").
path("HelloAsync");
AtomicBoolean pre = new AtomicBoolean();
@@ -1127,12 +1127,100 @@ public class JaxrsTest extends TestHelper {
}
@Test
+ public void testAsyncResourceCompletionStage()
+ throws ExecutionException, InterruptedException {
+
+ WebTarget webTarget =
+ createDefaultTarget().path("whiteboard").path("async").path("completionstage").
+ path("HelloAsync");
+
+ AtomicBoolean pre = new AtomicBoolean();
+ AtomicBoolean post = new AtomicBoolean();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ registerAddon(
+ new TestAsyncResource(
+ () -> pre.set(true),
+ () -> {
+ post.set(true);
+
+ countDownLatch.countDown();
+ }));
+
+ Future<String> future = webTarget.request().async().get(
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String s) {
+ assertTrue(pre.get());
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
+ }
+ });
+
+ String result = future.get();
+
+ countDownLatch.await(1, TimeUnit.MINUTES);
+
+ assertTrue(post.get());
+
+ assertEquals("This should say HelloAsync", "HelloAsync", result);
+ }
+
+ @Test
+ public void testAsyncResourcePromise()
+ throws ExecutionException, InterruptedException {
+
+ WebTarget webTarget =
+ createDefaultTarget().path("whiteboard").path("async").path("promise").
+ path("HelloAsync");
+
+ AtomicBoolean pre = new AtomicBoolean();
+ AtomicBoolean post = new AtomicBoolean();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ registerAddon(
+ new TestAsyncResource(
+ () -> pre.set(true),
+ () -> {
+ post.set(true);
+
+ countDownLatch.countDown();
+ }));
+
+ Future<String> future = webTarget.request().async().get(
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String s) {
+ assertTrue(pre.get());
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
+ }
+ });
+
+ String result = future.get();
+
+ countDownLatch.await(1, TimeUnit.MINUTES);
+
+ assertTrue(post.get());
+
+ assertEquals("This should say HelloAsync", "HelloAsync", result);
+ }
+
+ @Test
public void testAsyncResourceClientWithPromises()
throws ExecutionException, InterruptedException,
InvocationTargetException {
WebTarget webTarget =
- createDefaultTarget().path("whiteboard").path("async").
+ createDefaultTarget().path("whiteboard").path("async").path("promise").
path("HelloAsync");
AtomicBoolean pre = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
----------------------------------------------------------------------
diff --git a/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java b/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
index 4297aee..abe152f 100644
--- a/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
+++ b/jax-rs.itests/src/main/java/test/types/TestAsyncResource.java
@@ -17,6 +17,9 @@
package test.types;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -25,6 +28,9 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
@Path("whiteboard/async")
public class TestAsyncResource {
@@ -37,7 +43,7 @@ public class TestAsyncResource {
}
@GET
- @Path("{name}")
+ @Path("suspended/{name}")
@Produces(MediaType.TEXT_PLAIN)
public void echo(@Suspended AsyncResponse async,
@PathParam("name") String value) {
@@ -58,4 +64,56 @@ public class TestAsyncResource {
}
}).start();
}
+
+ @GET
+ @Path("completionstage/{name}")
+ @Produces(MediaType.TEXT_PLAIN)
+ public CompletionStage<String> echoCompletionStage(@PathParam("name") String value) {
+
+ CompletableFuture<String> cf = new CompletableFuture<>();
+
+ new Thread(() -> {
+ try {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ preResume.run();
+ cf.completeExceptionally(e);
+ return;
+ }
+ preResume.run();
+ cf.complete(value);
+ } finally {
+ postResume.run();
+ }
+ }).start();
+
+ return cf;
+ }
+
+ @GET
+ @Path("promise/{name}")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Promise<String> echoPromise(@PathParam("name") String value) {
+
+ Deferred<String> d = new Deferred<>();
+
+ new Thread(() -> {
+ try {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ preResume.run();
+ d.fail(e);
+ return;
+ }
+ preResume.run();
+ d.resolve(value);
+ } finally {
+ postResume.run();
+ }
+ }).start();
+
+ return d.getPromise();
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/pom.xml
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/pom.xml b/jax-rs.whiteboard/pom.xml
index 369a2e9..b627704 100644
--- a/jax-rs.whiteboard/pom.xml
+++ b/jax-rs.whiteboard/pom.xml
@@ -125,6 +125,11 @@
<version>1.1.0</version>
</dependency>
<dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.util.function</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.2</version>
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
index b6a5687..a70fada 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
@@ -18,8 +18,8 @@
package org.apache.aries.jax.rs.whiteboard.internal.cxf;
import static java.util.stream.Collectors.toMap;
-import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalize;
import static org.apache.aries.jax.rs.whiteboard.internal.Whiteboard.SUPPORTED_EXTENSION_INTERFACES;
+import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalize;
import static org.apache.cxf.jaxrs.provider.ProviderFactory.DEFAULT_FILTER_NAME_BINDING;
import java.util.ArrayList;
@@ -40,9 +40,9 @@ import javax.ws.rs.core.Feature;
import javax.ws.rs.core.FeatureContext;
import javax.ws.rs.ext.RuntimeDelegate;
+import org.apache.aries.component.dsl.CachingServiceReference;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceReferenceResourceProvider;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
-import org.apache.aries.component.dsl.CachingServiceReference;
import org.apache.cxf.Bus;
import org.apache.cxf.common.util.ClassHelper;
import org.apache.cxf.endpoint.Server;
@@ -57,7 +57,6 @@ import org.apache.cxf.jaxrs.provider.ServerConfigurableFactory;
import org.apache.cxf.jaxrs.sse.SseContextProvider;
import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
import org.apache.cxf.jaxrs.utils.AnnotationUtils;
-import org.osgi.framework.ServiceObjects;
public class CxfJaxrsServiceRegistrator {
@@ -212,6 +211,7 @@ public class CxfJaxrsServiceRegistrator {
_jaxRsServerFactoryBean = createEndpoint(
application, JAXRSServerFactoryBean.class);
+ _jaxRsServerFactoryBean.setInvoker(new PromiseAwareJAXRSInvoker());
_jaxRsServerFactoryBean.setBus(_bus);
_bus.setExtension(
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/81643717/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
new file mode 100644
index 0000000..91c091c
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/PromiseAwareJAXRSInvoker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf;
+
+import java.util.Arrays;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+import org.osgi.util.promise.Promise;
+
+public class PromiseAwareJAXRSInvoker extends JAXRSInvoker {
+
+ /**
+ * OSGi promises are a great way to do asynchronous work, and should be handled
+ * natively just like a CompletionStage
+ */
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+
+ // Fast path - do they share our view of the Promise
+ if (result instanceof Promise) {
+ return handlePromise(inMessage, (Promise<?>) result);
+ }
+
+ // Slower check, is it a Promise?
+ Class<?> clazz = result.getClass();
+ if(Arrays.stream(clazz.getInterfaces())
+ .map(Class::getName)
+ .anyMatch(n -> "org.osgi.util.promise.Promise".equals(n))) {
+
+ return handlePromiseFromAnotherClassSpace(inMessage, result, clazz);
+ }
+
+ return super.checkFutureResponse(inMessage, result);
+ }
+
+ private AsyncResponseImpl handlePromise(Message inMessage, final Promise<?> promise) {
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ promise.onSuccess(asyncResponse::resume)
+ .onFailure(asyncResponse::resume);
+ return asyncResponse;
+ }
+
+ private AsyncResponseImpl handlePromiseFromAnotherClassSpace(Message inMessage, Object result, Class<?> clazz) {
+ // It's a promise, but from a different class space. Use reflection to
+ // register a callback with the promise
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ try {
+ clazz.getMethod("onResolve", Runnable.class).invoke(result, (Runnable) () -> {
+ try {
+ Object failure = clazz.getMethod("getFailure").invoke(result);
+
+ if(failure != null) {
+ asyncResponse.resume((Throwable) failure);
+ } else {
+ asyncResponse.resume(clazz.getMethod("getValue").invoke(result));
+ }
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
+ });
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
+ return asyncResponse;
+ }
+}