You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/08/30 13:02:07 UTC
[2/2] cxf git commit: [CXF-7487] More work around supporoting RxJava2
[CXF-7487] More work around supporoting RxJava2
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/253900af
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/253900af
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/253900af
Branch: refs/heads/master
Commit: 253900af377eff63e8010d86e2375e4b5c426ed6
Parents: 0872b79
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Aug 30 14:01:51 2017 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Aug 30 14:01:51 2017 +0100
----------------------------------------------------------------------
.../cxf/jaxrs/swagger/SwaggerUiResolver.java | 16 +-
.../rx/server/AbstractAsyncSubscriber.java | 56 ------
.../rx/server/JsonStreamingAsyncSubscriber.java | 34 ----
.../jaxrs/rx/server/ListAsyncSubscriber.java | 42 -----
.../rx/server/StreamingAsyncSubscriber.java | 124 -------------
.../cxf/jaxrs/rx2/client/FlowableRxInvoker.java | 107 +++++++++++
.../jaxrs/rx2/client/FlowableRxInvokerImpl.java | 180 +++++++++++++++++++
.../rx2/client/FlowableRxInvokerProvider.java | 45 +++++
.../rx2/server/AbstractAsyncSubscriber.java | 65 +++++++
.../cxf/jaxrs/rx2/server/FlowableInvoker.java | 43 +++++
.../server/JsonStreamingAsyncSubscriber.java | 34 ++++
.../rx2/server/StreamingAsyncSubscriber.java | 126 +++++++++++++
.../reactive/JAXRSRxJava2FlowableTest.java | 146 +++++++++++++++
.../reactive/JAXRSRxJava2ObservableTest.java | 68 +++++++
.../jaxrs/reactive/JAXRSRxJava2Test.java | 68 -------
.../reactive/JAXRSRxJavaObservableTest.java | 83 +++++++++
.../systest/jaxrs/reactive/JAXRSRxJavaTest.java | 141 ---------------
.../jaxrs/reactive/RxJava2FlowableServer.java | 79 ++++++++
.../jaxrs/reactive/RxJava2FlowableService.java | 126 +++++++++++++
.../jaxrs/reactive/RxJava2ObservableServer.java | 73 ++++++++
.../reactive/RxJava2ObservableService.java | 3 +-
.../systest/jaxrs/reactive/RxJava2Server.java | 73 --------
.../jaxrs/reactive/RxJavaObservableServer.java | 79 ++++++++
.../jaxrs/reactive/RxJavaObservableService.java | 63 +------
.../systest/jaxrs/reactive/RxJavaServer.java | 79 --------
25 files changed, 1265 insertions(+), 688 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
----------------------------------------------------------------------
diff --git a/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java b/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
index 60f7d20..0e5b90a 100644
--- a/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
+++ b/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
@@ -52,16 +52,16 @@ public class SwaggerUiResolver {
return root;
}
}
- } else {
- Enumeration<URL> urls = cl.getResources(UI_RESOURCES_ROOT_START);
- while (urls.hasMoreElements()) {
- String urlStr = urls.nextElement().toString().replace(UI_RESOURCES_ROOT_START, "");
- String root = checkUiRoot(urlStr, swaggerUiMavenGroupAndArtifact, swaggerUiVersion);
- if (root != null) {
- return root;
- }
+ }
+ Enumeration<URL> urls = cl.getResources(UI_RESOURCES_ROOT_START);
+ while (urls.hasMoreElements()) {
+ String urlStr = urls.nextElement().toString().replace(UI_RESOURCES_ROOT_START, "");
+ String root = checkUiRoot(urlStr, swaggerUiMavenGroupAndArtifact, swaggerUiVersion);
+ if (root != null) {
+ return root;
}
}
+
} catch (Throwable ex) {
// ignore
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
deleted file mode 100644
index 5e27454..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-
- private AsyncResponse ar;
-
- protected AbstractAsyncSubscriber(AsyncResponse ar) {
- this.ar = ar;
- }
- public void resume(T response) {
- ar.resume(response);
- }
-
- public void resume(List<T> response) {
- ar.resume(response);
- }
-
- public void resume(StreamingResponse<T> response) {
- ar.resume(response);
- }
-
- @Override
- public void onError(Throwable t) {
- ar.resume(t);
- }
-
- protected AsyncResponse getAsyncResponse() {
- return ar;
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index f90b4b1..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
- public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
- this(ar, 1000);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
- this(ar, pollTimeout, 0);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
- super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
deleted file mode 100644
index cdbc383..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private List<T> beans = new LinkedList<T>();
- public ListAsyncSubscriber(AsyncResponse ar) {
- super(ar);
- }
- @Override
- public void onCompleted() {
- super.resume(beans);
- }
-
- @Override
- public void onNext(T bean) {
- beans.add(bean);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
deleted file mode 100644
index 1571973..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
- private String openTag;
- private String closeTag;
- private String separator;
- private long pollTimeout;
- private long asyncTimeout;
- private volatile boolean completed;
- private AtomicBoolean firstWriteDone = new AtomicBoolean();
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
- this(ar, openTag, closeTag, "", 1000);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout) {
- this(ar, openTag, closeTag, sep, pollTimeout, 0);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout, long asyncTimeout) {
- super(ar);
- this.openTag = openTag;
- this.closeTag = closeTag;
- this.separator = sep;
- this.pollTimeout = pollTimeout;
- this.asyncTimeout = 0;
- if (asyncTimeout > 0) {
- ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- ar.setTimeoutHandler(new TimeoutHandlerImpl());
- }
- }
- @Override
- public void onStart() {
- if (asyncTimeout == 0) {
- resumeAsyncResponse();
- }
- }
- private void resumeAsyncResponse() {
- super.resume(new StreamingResponseImpl());
- }
- @Override
- public void onCompleted() {
- completed = true;
- }
-
- @Override
- public void onNext(T bean) {
- if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
- resumeAsyncResponse();
- }
- queue.add(bean);
- }
- private class StreamingResponseImpl implements StreamingResponse<T> {
-
- @Override
- public void writeTo(Writer<T> writer) throws IOException {
- if (openTag != null) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
- }
- while (!completed || !queue.isEmpty()) {
- try {
- T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
- if (bean != null) {
- if (firstWriteDone.getAndSet(true)) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
- }
- writer.write(bean);
- }
- } catch (InterruptedException ex) {
- // ignore
- }
- }
- if (closeTag != null) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
- }
-
- }
-
- }
- public class TimeoutHandlerImpl implements TimeoutHandler {
-
- @Override
- public void handleTimeout(AsyncResponse asyncResponse) {
- if (queue.isEmpty()) {
- asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- } else {
- resumeAsyncResponse();
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
new file mode 100644
index 0000000..b306461
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.Flowable;
+
+
+@SuppressWarnings("rawtypes")
+public interface FlowableRxInvoker extends RxInvoker<Flowable> {
+
+ @Override
+ Flowable<Response> get();
+
+ @Override
+ <T> Flowable<T> get(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> get(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> put(Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> put(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Flowable<T> put(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Flowable<Response> post(Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> post(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Flowable<T> post(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Flowable<Response> delete();
+
+ @Override
+ <T> Flowable<T> delete(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> delete(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> head();
+
+ @Override
+ Flowable<Response> options();
+
+ @Override
+ <T> Flowable<T> options(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> options(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> trace();
+
+ @Override
+ <T> Flowable<T> trace(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> trace(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> method(String name);
+
+ @Override
+ <T> Flowable<T> method(String name, Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> method(String name, GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> method(String name, Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
new file mode 100644
index 0000000..be5c2d5
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+
+import io.reactivex.Flowable;
+import io.reactivex.Scheduler;
+import io.reactivex.schedulers.Schedulers;
+
+
+public class FlowableRxInvokerImpl implements FlowableRxInvoker {
+ private Scheduler sc;
+ private WebClient wc;
+ public FlowableRxInvokerImpl(WebClient wc, ExecutorService ex) {
+ this.wc = wc;
+ this.sc = ex == null ? null : Schedulers.from(ex);
+ }
+
+ @Override
+ public Flowable<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public Flowable<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> put(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public Flowable<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> post(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public Flowable<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public Flowable<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Flowable<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public Flowable<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public Flowable<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public Flowable<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType) {
+ if (sc == null) {
+ return Flowable.fromFuture(wc.async().method(name, entity, responseType));
+ }
+ return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+ if (sc == null) {
+ return Flowable.fromFuture(wc.async().method(name, entity, responseType));
+ }
+ return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Class<T> responseType) {
+ if (sc == null) {
+ return Flowable.fromFuture(wc.async().method(name, responseType));
+ }
+ return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+ if (sc == null) {
+ return Flowable.fromFuture(wc.async().method(name, responseType));
+ }
+ return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
new file mode 100644
index 0000000..e4e0e71
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class FlowableRxInvokerProvider implements RxInvokerProvider<FlowableRxInvoker> {
+
+ @Override
+ public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+ // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+ // Investigate if letting the RxJava thread pool deal with the sync invocation
+ // is indeed more effective
+ return new FlowableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ }
+
+ @Override
+ public boolean isProviderFor(Class<?> rxCls) {
+ return FlowableRxInvoker.class == rxCls;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..17664c2
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import io.reactivex.subscribers.DefaultSubscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends DefaultSubscriber<T> {
+
+ private AsyncResponse ar;
+
+ protected AbstractAsyncSubscriber(AsyncResponse ar) {
+ this.ar = ar;
+ }
+ public void resume(T response) {
+ ar.resume(response);
+ }
+
+ public void resume(List<T> response) {
+ ar.resume(response);
+ }
+
+ public void resume(StreamingResponse<T> response) {
+ ar.resume(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ ar.resume(t);
+ }
+
+ protected AsyncResponse getAsyncResponse() {
+ return ar;
+ }
+
+ @Override
+ public void onStart() {
+ requestNext();
+ }
+
+ protected void requestNext() {
+ super.request(1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
new file mode 100644
index 0000000..1ff7491
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.Flowable;
+
+public class FlowableInvoker extends JAXRSInvoker {
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+ if (result instanceof Flowable) {
+ final Flowable<?> f = (Flowable<?>)result;
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+ return asyncResponse;
+ }
+ return null;
+ }
+
+ private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+ //TODO: if it is a Cancelation exception => asyncResponse.cancel();
+ asyncResponse.resume(t);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c809799
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+ this(ar, 1000);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+ this(ar, pollTimeout, 0);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+ super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..f9eafe4
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+ private String openTag;
+ private String closeTag;
+ private String separator;
+ private long pollTimeout;
+ private long asyncTimeout;
+ private volatile boolean completed;
+ private AtomicBoolean firstWriteDone = new AtomicBoolean();
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+ this(ar, openTag, closeTag, "", 1000);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout) {
+ this(ar, openTag, closeTag, sep, pollTimeout, 0);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout, long asyncTimeout) {
+ super(ar);
+ this.openTag = openTag;
+ this.closeTag = closeTag;
+ this.separator = sep;
+ this.pollTimeout = pollTimeout;
+ this.asyncTimeout = 0;
+ if (asyncTimeout > 0) {
+ ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ ar.setTimeoutHandler(new TimeoutHandlerImpl());
+ }
+ }
+ @Override
+ public void onStart() {
+ if (asyncTimeout == 0) {
+ resumeAsyncResponse();
+ }
+ super.onStart();
+ }
+ private void resumeAsyncResponse() {
+ super.resume(new StreamingResponseImpl());
+ }
+ @Override
+ public void onComplete() {
+ completed = true;
+ }
+
+ @Override
+ public void onNext(T bean) {
+ if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+ resumeAsyncResponse();
+ }
+ queue.add(bean);
+ super.requestNext();
+ }
+ private class StreamingResponseImpl implements StreamingResponse<T> {
+
+ @Override
+ public void writeTo(Writer<T> writer) throws IOException {
+ if (openTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+ }
+ while (!completed || !queue.isEmpty()) {
+ try {
+ T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+ if (bean != null) {
+ if (firstWriteDone.getAndSet(true)) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+ }
+ writer.write(bean);
+ }
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ if (closeTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+ }
+
+ }
+
+ }
+ public class TimeoutHandlerImpl implements TimeoutHandler {
+
+ @Override
+ public void handleTimeout(AsyncResponse asyncResponse) {
+ if (queue.isEmpty()) {
+ asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ resumeAsyncResponse();
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
new file mode 100644
index 0000000..6e89960
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava2FlowableServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJava2FlowableServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldAsyncText() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textJson";
+ List<Object> providers = new LinkedList<>();
+ providers.add(new JacksonJsonProvider());
+ providers.add(new FlowableRxInvokerProvider());
+ WebClient wc = WebClient.create(address, providers);
+ Flowable<HelloWorldBean> obs = wc.accept("application/json")
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+ obs.subscribe(v -> {
+ holder.value = v;
+ });
+ Thread.sleep(3000);
+ assertEquals("Hello", holder.value.getGreeting());
+ assertEquals("World", holder.value.getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textJsonImplicitListAsync";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textJsonImplicitListAsyncStream";
+ doTestGetHelloWorldJsonList(address);
+ }
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldAsyncObservable() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new ObservableRxInvokerProvider()));
+ Observable<String> obs = wc.accept("text/plain")
+ .rx(ObservableRxInvoker.class)
+ .get(String.class);
+
+ Thread.sleep(2000);
+
+ obs.map(
+ s -> {
+ return s + s;
+ })
+ .subscribe(s -> assertDuplicateResponse(s));
+ }
+ @Test
+ public void testGetHelloWorldAsyncObservable404() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync404";
+ Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+ .target(address).request();
+ b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
+ s -> {
+ fail("Exception expected");
+ },
+ t -> validateT((ExecutionException)t));
+ }
+
+ private void validateT(ExecutionException t) {
+ assertTrue(t.getCause() instanceof NotFoundException);
+ }
+ private void assertDuplicateResponse(String s) {
+ assertEquals("Hello, world!Hello, world!", s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
new file mode 100644
index 0000000..b9fa5ee
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava2ObservableServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJava2ObservableServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx2/observable/textJson";
+ List<Object> providers = new LinkedList<>();
+ providers.add(new JacksonJsonProvider());
+ providers.add(new ObservableRxInvokerProvider());
+ WebClient wc = WebClient.create(address, providers);
+ Observable<HelloWorldBean> obs = wc.accept("application/json")
+ .rx(ObservableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+ obs.subscribe(v -> {
+ holder.value = v;
+ });
+ Thread.sleep(2000);
+ assertEquals("Hello", holder.value.getGreeting());
+ assertEquals("World", holder.value.getAudience());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
deleted file mode 100644
index ded2799..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.xml.ws.Holder;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import io.reactivex.Observable;
-
-public class JAXRSRxJava2Test extends AbstractBusClientServerTestBase {
- public static final String PORT = RxJava2Server.PORT;
- @BeforeClass
- public static void startServers() throws Exception {
- AbstractResourceInfo.clearAllMaps();
- assertTrue("server did not launch correctly",
- launchServer(RxJava2Server.class, true));
- createStaticBus();
- }
- @Test
- public void testGetHelloWorldJson() throws Exception {
- String address = "http://localhost:" + PORT + "/observable2/textJson";
- List<Object> providers = new LinkedList<>();
- providers.add(new JacksonJsonProvider());
- providers.add(new ObservableRxInvokerProvider());
- WebClient wc = WebClient.create(address, providers);
- Observable<HelloWorldBean> obs = wc.accept("application/json")
- .rx(ObservableRxInvoker.class)
- .get(HelloWorldBean.class);
-
- Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
- obs.subscribe(v -> {
- holder.value = v;
- });
- Thread.sleep(3000);
- assertEquals("Hello", holder.value.getGreeting());
- assertEquals("World", holder.value.getAudience());
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
new file mode 100644
index 0000000..3412001
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.List;
+
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JAXRSRxJavaObservableTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJavaObservableServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJavaObservableServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldText() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx/text";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx/textJson";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+ @Test
+ public void testGetHelloWorldJsonList() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx/textJsonList";
+ doTestGetHelloWorldJsonList(address);
+ }
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
deleted file mode 100644
index 9f197d8..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.GenericType;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import rx.Observable;
-
-public class JAXRSRxJavaTest extends AbstractBusClientServerTestBase {
- public static final String PORT = RxJavaServer.PORT;
- @BeforeClass
- public static void startServers() throws Exception {
- AbstractResourceInfo.clearAllMaps();
- assertTrue("server did not launch correctly",
- launchServer(RxJavaServer.class, true));
- createStaticBus();
- }
- @Test
- public void testGetHelloWorldText() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/text";
- WebClient wc = WebClient.create(address);
- String text = wc.accept("text/plain").get(String.class);
- assertEquals("Hello, world!", text);
- }
- @Test
- public void testGetHelloWorldAsyncText() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync";
- WebClient wc = WebClient.create(address);
- String text = wc.accept("text/plain").get(String.class);
- assertEquals("Hello, world!", text);
- }
-
- @Test
- public void testGetHelloWorldJson() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJson";
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new JacksonJsonProvider()));
- HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
- assertEquals("Hello", bean.getGreeting());
- assertEquals("World", bean.getAudience());
- }
- @Test
- public void testGetHelloWorldJsonList() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonList";
- doTestGetHelloWorldJsonList(address);
- }
- @Test
- public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
- doTestGetHelloWorldJsonList(address);
- }
- @Test
- public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
- doTestGetHelloWorldJsonList(address);
- }
- private void doTestGetHelloWorldJsonList(String address) throws Exception {
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new JacksonJsonProvider()));
- WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
- GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
- };
-
- List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
- assertEquals(2, beans.size());
- assertEquals("Hello", beans.get(0).getGreeting());
- assertEquals("World", beans.get(0).getAudience());
- assertEquals("Ciao", beans.get(1).getGreeting());
- assertEquals("World", beans.get(1).getAudience());
- }
-
- @Test
- public void testGetHelloWorldAsyncObservable() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync";
- WebClient wc = WebClient.create(address,
- Collections.singletonList(new ObservableRxInvokerProvider()));
- Observable<String> obs = wc.accept("text/plain")
- .rx(ObservableRxInvoker.class)
- .get(String.class);
- obs.map(s -> {
- return s + s;
- });
-
- Thread.sleep(3000);
-
- obs.subscribe(s -> assertDuplicateResponse(s));
- }
- @Test
- public void testGetHelloWorldAsyncObservable404() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textAsync404";
- Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
- .target(address).request();
- b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
- s -> {
- fail("Exception expected");
- },
- t -> validateT((ExecutionException)t));
- }
-
- private void validateT(ExecutionException t) {
- assertTrue(t.getCause() instanceof NotFoundException);
- }
- private void assertDuplicateResponse(String s) {
- assertEquals("Hello, world!Hello, world!", s);
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
new file mode 100644
index 0000000..fe41958
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx2.server.FlowableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2FlowableServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava2FlowableServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava2FlowableServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new FlowableInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+ streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+ sf.setProvider(streamProvider);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava2FlowableService.class);
+ sf.setResourceProvider(RxJava2FlowableService.class,
+ new SingletonResourceProvider(new RxJava2FlowableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava2FlowableServer s = new RxJava2FlowableServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
new file mode 100644
index 0000000..7812977
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.rx2.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx2.server.JsonStreamingAsyncSubscriber;
+
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
+
+@Path("/rx2/flowable")
+public class RxJava2FlowableService {
+
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Flowable<HelloWorldBean> getJson() {
+ return Flowable.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsync")
+ public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+ final HelloWorldBean bean1 = new HelloWorldBean();
+ final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ Flowable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
+ }
+ }).start();
+
+ }
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsyncStream")
+ public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+ Flowable.just("Hello", "Ciao")
+ .map(s -> new HelloWorldBean(s))
+ .subscribeOn(Schedulers.computation())
+ .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+ }
+
+ @GET
+ @Produces("text/plain")
+ @Path("textAsync")
+ public void getTextAsync(@Suspended final AsyncResponse ar) {
+ Flowable.just("Hello, ").map(s -> s + "world!")
+ .subscribe(new StringAsyncSubscriber(ar));
+
+ }
+
+ private static class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
+
+ private StringBuilder sb = new StringBuilder();
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onComplete() {
+ super.resume(sb.toString());
+ }
+
+ @Override
+ public void onNext(String s) {
+ sb.append(s);
+ super.requestNext();
+ }
+
+ }
+
+ private static class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private List<T> beans = new LinkedList<T>();
+ ListAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onComplete() {
+ super.resume(beans);
+ }
+
+ @Override
+ public void onNext(T bean) {
+ beans.add(bean);
+ super.requestNext();
+ }
+
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
new file mode 100644
index 0000000..48df030
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2ObservableServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJava2ObservableServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava2ObservableServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ObservableInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava2ObservableService.class);
+ sf.setResourceProvider(RxJava2ObservableService.class,
+ new SingletonResourceProvider(new RxJava2ObservableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava2ObservableServer s = new RxJava2ObservableServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index 28d053e..abb6e72 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -27,7 +27,7 @@ import javax.ws.rs.Produces;
import io.reactivex.Observable;
-@Path("/observable2")
+@Path("/rx2/observable")
public class RxJava2ObservableService {
@@ -37,7 +37,6 @@ public class RxJava2ObservableService {
public Observable<HelloWorldBean> getJson() {
return Observable.just(new HelloWorldBean());
}
-
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
deleted file mode 100644
index f9ab3ae..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-
-
-public class RxJava2Server extends AbstractBusTestServerBase {
- public static final String PORT = allocatePort(RxJava2Server.class);
-
- org.apache.cxf.endpoint.Server server;
- public RxJava2Server() {
- }
-
- protected void run() {
- Bus bus = BusFactory.getDefaultBus();
- // Make sure default JSONProvider is not loaded
- bus.setProperty("skip.default.json.provider.registration", true);
- JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setInvoker(new ObservableInvoker());
- sf.setProvider(new JacksonJsonProvider());
- sf.getOutInterceptors().add(new LoggingOutInterceptor());
- sf.setResourceClasses(RxJava2ObservableService.class);
- sf.setResourceProvider(RxJava2ObservableService.class,
- new SingletonResourceProvider(new RxJava2ObservableService(), true));
- sf.setAddress("http://localhost:" + PORT + "/");
- server = sf.create();
- }
-
- public void tearDown() throws Exception {
- server.stop();
- server.destroy();
- server = null;
- }
-
- public static void main(String[] args) {
- try {
- RxJava2Server s = new RxJava2Server();
- s.start();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.exit(-1);
- } finally {
- System.out.println("done!");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
new file mode 100644
index 0000000..85d576d
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJavaObservableServer extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(RxJavaObservableServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJavaObservableServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ObservableInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+ streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+ sf.setProvider(streamProvider);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJavaObservableService.class);
+ sf.setResourceProvider(RxJavaObservableService.class,
+ new SingletonResourceProvider(new RxJavaObservableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJavaObservableServer s = new RxJavaObservableServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
index de0f91f..a06d207 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
@@ -26,18 +26,11 @@ import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-
-import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
import rx.Observable;
-import rx.schedulers.Schedulers;
-@Path("/observable")
+@Path("/rx")
public class RxJavaObservableService {
@GET
@@ -48,15 +41,6 @@ public class RxJavaObservableService {
}
@GET
- @Produces("text/plain")
- @Path("textAsync")
- public void getTextAsync(@Suspended final AsyncResponse ar) {
- Observable.just("Hello, ").map(s -> s + "world!")
- .subscribe(new StringAsyncSubscriber(ar));
-
- }
-
- @GET
@Produces("application/json")
@Path("textJson")
public Observable<HelloWorldBean> getJson() {
@@ -65,33 +49,6 @@ public class RxJavaObservableService {
@GET
@Produces("application/json")
- @Path("textJsonImplicitListAsync")
- public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
- final HelloWorldBean bean1 = new HelloWorldBean();
- final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
- new Thread(new Runnable() {
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ex) {
- // ignore
- }
- Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
- }
- }).start();
-
- }
- @GET
- @Produces("application/json")
- @Path("textJsonImplicitListAsyncStream")
- public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
- Observable.just("Hello", "Ciao")
- .map(s -> new HelloWorldBean(s))
- .subscribeOn(Schedulers.computation())
- .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
- }
- @GET
- @Produces("application/json")
@Path("textJsonList")
public Observable<List<HelloWorldBean>> getJsonList() {
HelloWorldBean bean1 = new HelloWorldBean();
@@ -100,23 +57,7 @@ public class RxJavaObservableService {
return Observable.just(Arrays.asList(bean1, bean2));
}
- private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
-
- private StringBuilder sb = new StringBuilder();
- StringAsyncSubscriber(AsyncResponse ar) {
- super(ar);
- }
- @Override
- public void onCompleted() {
- super.resume(sb.toString());
- }
-
- @Override
- public void onNext(String s) {
- sb.append(s);
- }
-
- }
+
}