You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/08/28 16:36:51 UTC
cxf git commit: [CXF-7487] Basic RxJava2 Observable support,
Flowable to follow later
Repository: cxf
Updated Branches:
refs/heads/master a55635031 -> 0c5d4032d
[CXF-7487] Basic RxJava2 Observable support, Flowable to follow later
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0c5d4032
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0c5d4032
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0c5d4032
Branch: refs/heads/master
Commit: 0c5d4032d3ba7837e662abc192a27ae2814b21c9
Parents: a556350
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Mon Aug 28 17:36:37 2017 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Mon Aug 28 17:36:37 2017 +0100
----------------------------------------------------------------------
parent/pom.xml | 10 +-
rt/rs/extensions/rx/pom.xml | 6 +
.../jaxrs/rx2/client/ObservableRxInvoker.java | 107 +++++++++++
.../rx2/client/ObservableRxInvokerImpl.java | 180 +++++++++++++++++++
.../rx2/client/ObservableRxInvokerProvider.java | 45 +++++
.../cxf/jaxrs/rx2/server/ObservableInvoker.java | 43 +++++
systests/jaxrs/pom.xml | 7 +-
.../jaxrs/reactive/JAXRSObservableTest.java | 141 ---------------
.../jaxrs/reactive/JAXRSRxJava2Test.java | 68 +++++++
.../systest/jaxrs/reactive/JAXRSRxJavaTest.java | 141 +++++++++++++++
.../jaxrs/reactive/ObservableServer.java | 79 --------
.../jaxrs/reactive/ObservableService.java | 122 -------------
.../reactive/RxJava2ObservableService.java | 43 +++++
.../systest/jaxrs/reactive/RxJava2Server.java | 73 ++++++++
.../jaxrs/reactive/RxJavaObservableService.java | 122 +++++++++++++
.../systest/jaxrs/reactive/RxJavaServer.java | 79 ++++++++
16 files changed, 921 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 424e28a..8d8e77b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -101,7 +101,8 @@
<cxf.log4j.version>1.2.17</cxf.log4j.version>
<cxf.lucene.version>4.9.0</cxf.lucene.version>
<cxf.mina.version>2.0.14</cxf.mina.version>
- <cxf.rx.java.version>1.2.7</cxf.rx.java.version>
+ <cxf.rxjava.version>1.3.0</cxf.rxjava.version>
+ <cxf.rxjava2.version>2.1.3</cxf.rxjava2.version>
<cxf.javax.annotation-api.version>1.3</cxf.javax.annotation-api.version>
<cxf.jcache.version>1.0.0</cxf.jcache.version>
<cxf.geronimo.jms.version>1.1.1</cxf.geronimo.jms.version>
@@ -819,7 +820,12 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
- <version>${cxf.rx.java.version}</version>
+ <version>${cxf.rxjava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${cxf.rxjava2.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 13bf166..d6b0360 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -48,6 +48,12 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
new file mode 100644
index 0000000..41d1ec9
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.Observable;
+
+
+@SuppressWarnings("rawtypes")
+public interface ObservableRxInvoker extends RxInvoker<Observable> {
+
+ @Override
+ Observable<Response> get();
+
+ @Override
+ <T> Observable<T> get(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> get(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> put(Entity<?> entity);
+
+ @Override
+ <T> Observable<T> put(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Observable<T> put(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Observable<Response> post(Entity<?> entity);
+
+ @Override
+ <T> Observable<T> post(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Observable<T> post(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Observable<Response> delete();
+
+ @Override
+ <T> Observable<T> delete(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> delete(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> head();
+
+ @Override
+ Observable<Response> options();
+
+ @Override
+ <T> Observable<T> options(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> options(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> trace();
+
+ @Override
+ <T> Observable<T> trace(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> trace(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> method(String name);
+
+ @Override
+ <T> Observable<T> method(String name, Class<T> responseType);
+
+ @Override
+ <T> Observable<T> method(String name, GenericType<T> responseType);
+
+ @Override
+ Observable<Response> method(String name, Entity<?> entity);
+
+ @Override
+ <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+ @Override
+ <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
new file mode 100644
index 0000000..2c1f966
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+
+import io.reactivex.Observable;
+import io.reactivex.Scheduler;
+import io.reactivex.schedulers.Schedulers;
+
+
+public class ObservableRxInvokerImpl implements ObservableRxInvoker {
+ private Scheduler sc;
+ private WebClient wc;
+ public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
+ this.wc = wc;
+ this.sc = ex == null ? null : Schedulers.from(ex);
+ }
+
+ @Override
+ public Observable<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public Observable<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> put(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public Observable<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> post(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public Observable<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public Observable<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Observable<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public Observable<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Observable<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public Observable<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public Observable<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) {
+ if (sc == null) {
+ return Observable.fromFuture(wc.async().method(name, entity, responseType));
+ }
+ return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+ if (sc == null) {
+ return Observable.fromFuture(wc.async().method(name, entity, responseType));
+ }
+ return Observable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Class<T> responseType) {
+ if (sc == null) {
+ return Observable.fromFuture(wc.async().method(name, responseType));
+ }
+ return Observable.fromFuture(wc.async().method(name, responseType), sc);
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, GenericType<T> responseType) {
+ if (sc == null) {
+ return Observable.fromFuture(wc.async().method(name, responseType));
+ }
+ return Observable.fromFuture(wc.async().method(name, responseType), sc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
new file mode 100644
index 0000000..221bc48
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
+
+ @Override
+ public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+ // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+ // Investigate if letting the RxJava thread pool deal with the sync invocation
+ // is indeed more effective
+ return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ }
+
+ @Override
+ public boolean isProviderFor(Class<?> rxCls) {
+ return ObservableRxInvoker.class == rxCls;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
new file mode 100644
index 0000000..8047c6a
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.Observable;
+
+public class ObservableInvoker extends JAXRSInvoker {
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+ if (result instanceof Observable) {
+ final Observable<?> obs = (Observable<?>)result;
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+ return asyncResponse;
+ }
+ return null;
+ }
+
+ private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+ //TODO: if it is a Cancelation exception => asyncResponse.cancel();
+ asyncResponse.resume(t);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/pom.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index b515011..542201a 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,7 +60,12 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
- <version>${cxf.rx.java.version}</version>
+ <version>${cxf.rxjava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${cxf.rxjava2.version}</version>
</dependency>
<!--
<dependency>
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
deleted file mode 100644
index 39d8fd5..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.GenericType;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import rx.Observable;
-
-public class JAXRSObservableTest extends AbstractBusClientServerTestBase {
- public static final String PORT = ObservableServer.PORT;
- @BeforeClass
- public static void startServers() throws Exception {
- AbstractResourceInfo.clearAllMaps();
- assertTrue("server did not launch correctly",
- launchServer(ObservableServer.class, true));
- createStaticBus();
- }
- @Test
- public void testGetHelloWorldText() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/text";
- WebClient wc = WebClient.create(address);
- String text = wc.accept("text/plain").get(String.class);
- assertEquals("Hello, world!", text);
- }
- @Test
- public void testGetHelloWorldAsyncText() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync";
- WebClient wc = WebClient.create(address);
- String text = wc.accept("text/plain").get(String.class);
- assertEquals("Hello, world!", text);
- }
-
- @Test
- public void testGetHelloWorldJson() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJson";
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new JacksonJsonProvider()));
- HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
- assertEquals("Hello", bean.getGreeting());
- assertEquals("World", bean.getAudience());
- }
- @Test
- public void testGetHelloWorldJsonList() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonList";
- doTestGetHelloWorldJsonList(address);
- }
- @Test
- public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
- doTestGetHelloWorldJsonList(address);
- }
- @Test
- public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
- doTestGetHelloWorldJsonList(address);
- }
- private void doTestGetHelloWorldJsonList(String address) throws Exception {
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new JacksonJsonProvider()));
- WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
- GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
- };
-
- List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
- assertEquals(2, beans.size());
- assertEquals("Hello", beans.get(0).getGreeting());
- assertEquals("World", beans.get(0).getAudience());
- assertEquals("Ciao", beans.get(1).getGreeting());
- assertEquals("World", beans.get(1).getAudience());
- }
-
- @Test
- public void testGetHelloWorldAsyncObservable() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync";
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new ObservableRxInvokerProvider()));
- Observable<String> obs = wc.accept("text/plain")
- .rx(ObservableRxInvoker.class)
- .get(String.class);
- obs.map(s -> {
- return s + s;
- });
-
- Thread.sleep(3000);
-
- obs.subscribe(s -> assertDuplicateResponse(s));
- }
- @Test
- public void testGetHelloWorldAsyncObservable404() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync404";
- Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
- .target(address).request();
- b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
- s -> {
- fail("Exception expected");
- },
- t -> validateT((ExecutionException)t));
- }
-
- private void validateT(ExecutionException t) {
- assertTrue(t.getCause() instanceof NotFoundException);
- }
- private void assertDuplicateResponse(String s) {
- assertEquals("Hello, world!Hello, world!", s);
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
new file mode 100644
index 0000000..ded2799
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2Test extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava2Server.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJava2Server.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable2/textJson";
+ List<Object> providers = new LinkedList<>();
+ providers.add(new JacksonJsonProvider());
+ providers.add(new ObservableRxInvokerProvider());
+ WebClient wc = WebClient.create(address, providers);
+ Observable<HelloWorldBean> obs = wc.accept("application/json")
+ .rx(ObservableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+ obs.subscribe(v -> {
+ holder.value = v;
+ });
+ Thread.sleep(3000);
+ assertEquals("Hello", holder.value.getGreeting());
+ assertEquals("World", holder.value.getAudience());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
new file mode 100644
index 0000000..9f197d8
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import rx.Observable;
+
+public class JAXRSRxJavaTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJavaServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJavaServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldText() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/text";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+ @Test
+ public void testGetHelloWorldAsyncText() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textAsync";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textJson";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+ @Test
+ public void testGetHelloWorldJsonList() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textJsonList";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
+ doTestGetHelloWorldJsonList(address);
+ }
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldAsyncObservable() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textAsync";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new ObservableRxInvokerProvider()));
+ Observable<String> obs = wc.accept("text/plain")
+ .rx(ObservableRxInvoker.class)
+ .get(String.class);
+ obs.map(s -> {
+ return s + s;
+ });
+
+ Thread.sleep(3000);
+
+ obs.subscribe(s -> assertDuplicateResponse(s));
+ }
+ @Test
+ public void testGetHelloWorldAsyncObservable404() throws Exception {
+ String address = "http://localhost:" + PORT + "/observable/textAsync404";
+ Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+ .target(address).request();
+ b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
+ s -> {
+ fail("Exception expected");
+ },
+ t -> validateT((ExecutionException)t));
+ }
+
+ private void validateT(ExecutionException t) {
+ assertTrue(t.getCause() instanceof NotFoundException);
+ }
+ private void assertDuplicateResponse(String s) {
+ assertEquals("Hello, world!Hello, world!", s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
deleted file mode 100644
index 03f89ef..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-
-
-public class ObservableServer extends AbstractBusTestServerBase {
- public static final String PORT = allocatePort(ObservableServer.class);
-
- org.apache.cxf.endpoint.Server server;
- public ObservableServer() {
- }
-
- protected void run() {
- Bus bus = BusFactory.getDefaultBus();
- // Make sure default JSONProvider is not loaded
- bus.setProperty("skip.default.json.provider.registration", true);
- JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setInvoker(new ObservableInvoker());
- sf.setProvider(new JacksonJsonProvider());
- StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
- streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
- sf.setProvider(streamProvider);
- sf.getOutInterceptors().add(new LoggingOutInterceptor());
- sf.setResourceClasses(ObservableService.class);
- sf.setResourceProvider(ObservableService.class,
- new SingletonResourceProvider(new ObservableService(), true));
- sf.setAddress("http://localhost:" + PORT + "/");
- server = sf.create();
- }
-
- public void tearDown() throws Exception {
- server.stop();
- server.destroy();
- server = null;
- }
-
- public static void main(String[] args) {
- try {
- ObservableServer s = new ObservableServer();
- s.start();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.exit(-1);
- } finally {
- System.out.println("done!");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
deleted file mode 100644
index 00783fd..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-
-import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-@Path("/observable")
-public class ObservableService {
-
- @GET
- @Produces("text/plain")
- @Path("text")
- public Observable<String> getText() {
- return Observable.just("Hello, world!");
- }
-
- @GET
- @Produces("text/plain")
- @Path("textAsync")
- public void getTextAsync(@Suspended final AsyncResponse ar) {
- Observable.just("Hello, ").map(s -> s + "world!")
- .subscribe(new StringAsyncSubscriber(ar));
-
- }
-
- @GET
- @Produces("application/json")
- @Path("textJson")
- public Observable<HelloWorldBean> getJson() {
- return Observable.just(new HelloWorldBean());
- }
-
- @GET
- @Produces("application/json")
- @Path("textJsonImplicitListAsync")
- public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
- final HelloWorldBean bean1 = new HelloWorldBean();
- final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
- new Thread(new Runnable() {
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ex) {
- // ignore
- }
- Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
- }
- }).start();
-
- }
- @GET
- @Produces("application/json")
- @Path("textJsonImplicitListAsyncStream")
- public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
- Observable.just("Hello", "Ciao")
- .map(s -> new HelloWorldBean(s))
- .subscribeOn(Schedulers.computation())
- .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
- }
- @GET
- @Produces("application/json")
- @Path("textJsonList")
- public Observable<List<HelloWorldBean>> getJsonList() {
- HelloWorldBean bean1 = new HelloWorldBean();
- HelloWorldBean bean2 = new HelloWorldBean();
- bean2.setGreeting("Ciao");
- return Observable.just(Arrays.asList(bean1, bean2));
- }
-
- private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
-
- private StringBuilder sb = new StringBuilder();
- StringAsyncSubscriber(AsyncResponse ar) {
- super(ar);
- }
- @Override
- public void onCompleted() {
- super.resume(sb.toString());
- }
-
- @Override
- public void onNext(String s) {
- sb.append(s);
- }
-
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
new file mode 100644
index 0000000..28d053e
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import io.reactivex.Observable;
+
+
+@Path("/observable2")
+public class RxJava2ObservableService {
+
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Observable<HelloWorldBean> getJson() {
+ return Observable.just(new HelloWorldBean());
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
new file mode 100644
index 0000000..f9ab3ae
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2Server extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava2Server.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava2Server() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ObservableInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava2ObservableService.class);
+ sf.setResourceProvider(RxJava2ObservableService.class,
+ new SingletonResourceProvider(new RxJava2ObservableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava2Server s = new RxJava2Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
new file mode 100644
index 0000000..de0f91f
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+@Path("/observable")
+public class RxJavaObservableService {
+
+ @GET
+ @Produces("text/plain")
+ @Path("text")
+ public Observable<String> getText() {
+ return Observable.just("Hello, world!");
+ }
+
+ @GET
+ @Produces("text/plain")
+ @Path("textAsync")
+ public void getTextAsync(@Suspended final AsyncResponse ar) {
+ Observable.just("Hello, ").map(s -> s + "world!")
+ .subscribe(new StringAsyncSubscriber(ar));
+
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Observable<HelloWorldBean> getJson() {
+ return Observable.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsync")
+ public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+ final HelloWorldBean bean1 = new HelloWorldBean();
+ final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
+ }
+ }).start();
+
+ }
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsyncStream")
+ public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+ Observable.just("Hello", "Ciao")
+ .map(s -> new HelloWorldBean(s))
+ .subscribeOn(Schedulers.computation())
+ .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+ }
+ @GET
+ @Produces("application/json")
+ @Path("textJsonList")
+ public Observable<List<HelloWorldBean>> getJsonList() {
+ HelloWorldBean bean1 = new HelloWorldBean();
+ HelloWorldBean bean2 = new HelloWorldBean();
+ bean2.setGreeting("Ciao");
+ return Observable.just(Arrays.asList(bean1, bean2));
+ }
+
+ private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
+
+ private StringBuilder sb = new StringBuilder();
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onCompleted() {
+ super.resume(sb.toString());
+ }
+
+ @Override
+ public void onNext(String s) {
+ sb.append(s);
+ }
+
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
new file mode 100644
index 0000000..70f58b3
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJavaServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJavaServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJavaServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ObservableInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+ streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+ sf.setProvider(streamProvider);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJavaObservableService.class);
+ sf.setResourceProvider(RxJavaObservableService.class,
+ new SingletonResourceProvider(new RxJavaObservableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJavaServer s = new RxJavaServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}