You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/08/30 13:02:07 UTC

[2/2] cxf git commit: [CXF-7487] More work around supporoting RxJava2

[CXF-7487] More work around supporoting RxJava2


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/253900af
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/253900af
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/253900af

Branch: refs/heads/master
Commit: 253900af377eff63e8010d86e2375e4b5c426ed6
Parents: 0872b79
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Aug 30 14:01:51 2017 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Aug 30 14:01:51 2017 +0100

----------------------------------------------------------------------
 .../cxf/jaxrs/swagger/SwaggerUiResolver.java    |  16 +-
 .../rx/server/AbstractAsyncSubscriber.java      |  56 ------
 .../rx/server/JsonStreamingAsyncSubscriber.java |  34 ----
 .../jaxrs/rx/server/ListAsyncSubscriber.java    |  42 -----
 .../rx/server/StreamingAsyncSubscriber.java     | 124 -------------
 .../cxf/jaxrs/rx2/client/FlowableRxInvoker.java | 107 +++++++++++
 .../jaxrs/rx2/client/FlowableRxInvokerImpl.java | 180 +++++++++++++++++++
 .../rx2/client/FlowableRxInvokerProvider.java   |  45 +++++
 .../rx2/server/AbstractAsyncSubscriber.java     |  65 +++++++
 .../cxf/jaxrs/rx2/server/FlowableInvoker.java   |  43 +++++
 .../server/JsonStreamingAsyncSubscriber.java    |  34 ++++
 .../rx2/server/StreamingAsyncSubscriber.java    | 126 +++++++++++++
 .../reactive/JAXRSRxJava2FlowableTest.java      | 146 +++++++++++++++
 .../reactive/JAXRSRxJava2ObservableTest.java    |  68 +++++++
 .../jaxrs/reactive/JAXRSRxJava2Test.java        |  68 -------
 .../reactive/JAXRSRxJavaObservableTest.java     |  83 +++++++++
 .../systest/jaxrs/reactive/JAXRSRxJavaTest.java | 141 ---------------
 .../jaxrs/reactive/RxJava2FlowableServer.java   |  79 ++++++++
 .../jaxrs/reactive/RxJava2FlowableService.java  | 126 +++++++++++++
 .../jaxrs/reactive/RxJava2ObservableServer.java |  73 ++++++++
 .../reactive/RxJava2ObservableService.java      |   3 +-
 .../systest/jaxrs/reactive/RxJava2Server.java   |  73 --------
 .../jaxrs/reactive/RxJavaObservableServer.java  |  79 ++++++++
 .../jaxrs/reactive/RxJavaObservableService.java |  63 +------
 .../systest/jaxrs/reactive/RxJavaServer.java    |  79 --------
 25 files changed, 1265 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
----------------------------------------------------------------------
diff --git a/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java b/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
index 60f7d20..0e5b90a 100644
--- a/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
+++ b/rt/rs/description-swagger/src/main/java/org/apache/cxf/jaxrs/swagger/SwaggerUiResolver.java
@@ -52,16 +52,16 @@ public class SwaggerUiResolver {
                         return root;
                     }
                 }
-            } else {
-                Enumeration<URL> urls = cl.getResources(UI_RESOURCES_ROOT_START);
-                while (urls.hasMoreElements()) {
-                    String urlStr = urls.nextElement().toString().replace(UI_RESOURCES_ROOT_START, "");     
-                    String root = checkUiRoot(urlStr, swaggerUiMavenGroupAndArtifact, swaggerUiVersion);
-                    if (root != null) {
-                        return root;
-                    }
+            }
+            Enumeration<URL> urls = cl.getResources(UI_RESOURCES_ROOT_START);
+            while (urls.hasMoreElements()) {
+                String urlStr = urls.nextElement().toString().replace(UI_RESOURCES_ROOT_START, "");     
+                String root = checkUiRoot(urlStr, swaggerUiMavenGroupAndArtifact, swaggerUiVersion);
+                if (root != null) {
+                    return root;
                 }
             }
+            
         } catch (Throwable ex) {
             // ignore
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
deleted file mode 100644
index 5e27454..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-
-    private AsyncResponse ar;
-
-    protected AbstractAsyncSubscriber(AsyncResponse ar) {
-        this.ar = ar;
-    }
-    public void resume(T response) {
-        ar.resume(response);
-    }
-
-    public void resume(List<T> response) {
-        ar.resume(response);
-    }
-
-    public void resume(StreamingResponse<T> response) {
-        ar.resume(response);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-        ar.resume(t);
-    }
-
-    protected AsyncResponse getAsyncResponse() {
-        return ar;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index f90b4b1..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
-        this(ar, 1000);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
-        this(ar, pollTimeout, 0);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
-        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
deleted file mode 100644
index cdbc383..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
-    private List<T> beans = new LinkedList<T>();
-    public ListAsyncSubscriber(AsyncResponse ar) {
-        super(ar);
-    }
-    @Override
-    public void onCompleted() {
-        super.resume(beans);
-    }
-
-    @Override
-    public void onNext(T bean) {
-        beans.add(bean);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
deleted file mode 100644
index 1571973..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.server;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
-    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
-    private String openTag;
-    private String closeTag;
-    private String separator;
-    private long pollTimeout;
-    private long asyncTimeout;
-    private volatile boolean completed;
-    private AtomicBoolean firstWriteDone = new AtomicBoolean();
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
-        this(ar, openTag, closeTag, "", 1000);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
-                                    long pollTimeout) {
-        this(ar, openTag, closeTag, sep, pollTimeout, 0);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
-                                    long pollTimeout, long asyncTimeout) {
-        super(ar);
-        this.openTag = openTag;
-        this.closeTag = closeTag;
-        this.separator = sep;
-        this.pollTimeout = pollTimeout;
-        this.asyncTimeout = 0;
-        if (asyncTimeout > 0) {
-            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            ar.setTimeoutHandler(new TimeoutHandlerImpl());
-        }
-    }
-    @Override
-    public void onStart() {
-        if (asyncTimeout == 0) {
-            resumeAsyncResponse();
-        }
-    }
-    private void resumeAsyncResponse() {
-        super.resume(new StreamingResponseImpl());
-    }
-    @Override
-    public void onCompleted() {
-        completed = true;
-    }
-
-    @Override
-    public void onNext(T bean) {
-        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
-            resumeAsyncResponse();
-        }
-        queue.add(bean);
-    }
-    private class StreamingResponseImpl implements StreamingResponse<T> {
-
-        @Override
-        public void writeTo(Writer<T> writer) throws IOException {
-            if (openTag != null) {
-                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
-            }
-            while (!completed || !queue.isEmpty()) {
-                try {
-                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
-                    if (bean != null) {
-                        if (firstWriteDone.getAndSet(true)) {
-                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
-                        }
-                        writer.write(bean);
-                    }
-                } catch (InterruptedException ex) {
-                    // ignore
-                }
-            }
-            if (closeTag != null) {
-                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
-            }
-
-        }
-
-    }
-    public class TimeoutHandlerImpl implements TimeoutHandler {
-
-        @Override
-        public void handleTimeout(AsyncResponse asyncResponse) {
-            if (queue.isEmpty()) {
-                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            } else {
-                resumeAsyncResponse();
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
new file mode 100644
index 0000000..b306461
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.Flowable;
+
+
+@SuppressWarnings("rawtypes")
+public interface FlowableRxInvoker extends RxInvoker<Flowable> {
+
+    @Override
+    Flowable<Response> get();
+
+    @Override
+    <T> Flowable<T> get(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> get(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> put(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Flowable<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Flowable<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> post(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Flowable<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Flowable<Response> delete();
+
+    @Override
+    <T> Flowable<T> delete(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> delete(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> head();
+
+    @Override
+    Flowable<Response> options();
+
+    @Override
+    <T> Flowable<T> options(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> options(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> trace();
+
+    @Override
+    <T> Flowable<T> trace(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> trace(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> method(String name);
+
+    @Override
+    <T> Flowable<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
new file mode 100644
index 0000000..be5c2d5
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+
+import io.reactivex.Flowable;
+import io.reactivex.Scheduler;
+import io.reactivex.schedulers.Schedulers;
+
+
+public class FlowableRxInvokerImpl implements FlowableRxInvoker {
+    private Scheduler sc;
+    private WebClient wc;
+    public FlowableRxInvokerImpl(WebClient wc, ExecutorService ex) {
+        this.wc = wc;
+        this.sc = ex == null ? null : Schedulers.from(ex);
+    }
+
+    @Override
+    public Flowable<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public Flowable<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> put(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public Flowable<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> post(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public Flowable<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public Flowable<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Flowable<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public Flowable<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public Flowable<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public Flowable<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> responseType) {
+        if (sc == null) {
+            return Flowable.fromFuture(wc.async().method(name, entity, responseType));
+        }
+        return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+        if (sc == null) {
+            return Flowable.fromFuture(wc.async().method(name, entity, responseType));
+        }
+        return Flowable.fromFuture(wc.async().method(name, entity, responseType), sc);
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, Class<T> responseType) {
+        if (sc == null) {
+            return Flowable.fromFuture(wc.async().method(name, responseType));
+        }
+        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+        if (sc == null) {
+            return Flowable.fromFuture(wc.async().method(name, responseType));
+        }
+        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
new file mode 100644
index 0000000..e4e0e71
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
+
+@Provider
+public class FlowableRxInvokerProvider implements RxInvokerProvider<FlowableRxInvoker> {
+
+    @Override
+    public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+        // Investigate if letting the RxJava thread pool deal with the sync invocation
+        // is indeed more effective
+        return new FlowableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+    }
+
+    @Override
+    public boolean isProviderFor(Class<?> rxCls) {
+        return FlowableRxInvoker.class == rxCls;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..17664c2
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractAsyncSubscriber.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import io.reactivex.subscribers.DefaultSubscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends DefaultSubscriber<T> {
+
+    private AsyncResponse ar;
+
+    protected AbstractAsyncSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+
+    @Override
+    public void onStart() {
+        requestNext();
+    }
+    
+    protected void requestNext() {
+        super.request(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
new file mode 100644
index 0000000..1ff7491
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.Flowable;
+
+public class FlowableInvoker extends JAXRSInvoker {
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        if (result instanceof Flowable) {
+            final Flowable<?> f = (Flowable<?>)result;
+            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+            f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+            return asyncResponse;
+        }
+        return null;
+    }
+
+    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
+        asyncResponse.resume(t);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c809799
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        this(ar, pollTimeout, 0);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..f9eafe4
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private AtomicBoolean firstWriteDone = new AtomicBoolean();
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+        this(ar, openTag, closeTag, "", 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onStart() {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+        super.onStart();
+    }
+    private void resumeAsyncResponse() {
+        super.resume(new StreamingResponseImpl());
+    }
+    @Override
+    public void onComplete() {
+        completed = true;
+    }
+
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+        super.requestNext();
+    }
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+            }
+            while (!completed || !queue.isEmpty()) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone.getAndSet(true)) {
+                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+                        }
+                        writer.write(bean);
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+            }
+
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
new file mode 100644
index 0000000..6e89960
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2FlowableServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJava2FlowableServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldAsyncText() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+    
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textJson";
+        List<Object> providers = new LinkedList<>();
+        providers.add(new JacksonJsonProvider());
+        providers.add(new FlowableRxInvokerProvider());
+        WebClient wc = WebClient.create(address, providers);
+        Flowable<HelloWorldBean> obs = wc.accept("application/json")
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+        obs.subscribe(v -> {
+            holder.value = v;
+        });
+        Thread.sleep(3000);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+    
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textJsonImplicitListAsync";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textJsonImplicitListAsyncStream";
+        doTestGetHelloWorldJsonList(address);
+    }
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
+    
+    @Test
+    public void testGetHelloWorldAsyncObservable() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new ObservableRxInvokerProvider()));
+        Observable<String> obs = wc.accept("text/plain")
+            .rx(ObservableRxInvoker.class)
+            .get(String.class);
+        
+        Thread.sleep(2000);
+
+        obs.map(
+            s -> {
+                return s + s;
+            })
+            .subscribe(s -> assertDuplicateResponse(s));
+    }
+    @Test
+    public void testGetHelloWorldAsyncObservable404() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync404";
+        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+            .target(address).request();
+        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
+            s -> {
+                fail("Exception expected");
+            },
+            t -> validateT((ExecutionException)t));
+    }
+
+    private void validateT(ExecutionException t) {
+        assertTrue(t.getCause() instanceof NotFoundException);
+    }
+    private void assertDuplicateResponse(String s) {
+        assertEquals("Hello, world!Hello, world!", s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
new file mode 100644
index 0000000..b9fa5ee
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.reactivex.Observable;
+
+public class JAXRSRxJava2ObservableTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava2ObservableServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJava2ObservableServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx2/observable/textJson";
+        List<Object> providers = new LinkedList<>();
+        providers.add(new JacksonJsonProvider());
+        providers.add(new ObservableRxInvokerProvider());
+        WebClient wc = WebClient.create(address, providers);
+        Observable<HelloWorldBean> obs = wc.accept("application/json")
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+        
+        Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
+        obs.subscribe(v -> {
+            holder.value = v;
+        });
+        Thread.sleep(2000);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
deleted file mode 100644
index ded2799..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.xml.ws.Holder;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import io.reactivex.Observable;
-
-public class JAXRSRxJava2Test extends AbstractBusClientServerTestBase {
-    public static final String PORT = RxJava2Server.PORT;
-    @BeforeClass
-    public static void startServers() throws Exception {
-        AbstractResourceInfo.clearAllMaps();
-        assertTrue("server did not launch correctly",
-                   launchServer(RxJava2Server.class, true));
-        createStaticBus();
-    }
-    @Test
-    public void testGetHelloWorldJson() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable2/textJson";
-        List<Object> providers = new LinkedList<>();
-        providers.add(new JacksonJsonProvider());
-        providers.add(new ObservableRxInvokerProvider());
-        WebClient wc = WebClient.create(address, providers);
-        Observable<HelloWorldBean> obs = wc.accept("application/json")
-            .rx(ObservableRxInvoker.class)
-            .get(HelloWorldBean.class);
-        
-        Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>();
-        obs.subscribe(v -> {
-            holder.value = v;
-        });
-        Thread.sleep(3000);
-        assertEquals("Hello", holder.value.getGreeting());
-        assertEquals("World", holder.value.getAudience());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
new file mode 100644
index 0000000..3412001
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaObservableTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.List;
+
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JAXRSRxJavaObservableTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJavaObservableServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJavaObservableServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldText() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx/text";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+    
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx/textJson";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:" + PORT + "/rx/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    }
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
deleted file mode 100644
index 9f197d8..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.GenericType;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
-import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import rx.Observable;
-
-public class JAXRSRxJavaTest extends AbstractBusClientServerTestBase {
-    public static final String PORT = RxJavaServer.PORT;
-    @BeforeClass
-    public static void startServers() throws Exception {
-        AbstractResourceInfo.clearAllMaps();
-        assertTrue("server did not launch correctly",
-                   launchServer(RxJavaServer.class, true));
-        createStaticBus();
-    }
-    @Test
-    public void testGetHelloWorldText() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/text";
-        WebClient wc = WebClient.create(address);
-        String text = wc.accept("text/plain").get(String.class);
-        assertEquals("Hello, world!", text);
-    }
-    @Test
-    public void testGetHelloWorldAsyncText() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync";
-        WebClient wc = WebClient.create(address);
-        String text = wc.accept("text/plain").get(String.class);
-        assertEquals("Hello, world!", text);
-    }
-
-    @Test
-    public void testGetHelloWorldJson() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJson";
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new JacksonJsonProvider()));
-        HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
-        assertEquals("Hello", bean.getGreeting());
-        assertEquals("World", bean.getAudience());
-    }
-    @Test
-    public void testGetHelloWorldJsonList() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonList";
-        doTestGetHelloWorldJsonList(address);
-    }
-    @Test
-    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
-        doTestGetHelloWorldJsonList(address);
-    }
-    @Test
-    public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream";
-        doTestGetHelloWorldJsonList(address);
-    }
-    private void doTestGetHelloWorldJsonList(String address) throws Exception {
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new JacksonJsonProvider()));
-        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
-        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
-        };
-
-        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
-        assertEquals(2, beans.size());
-        assertEquals("Hello", beans.get(0).getGreeting());
-        assertEquals("World", beans.get(0).getAudience());
-        assertEquals("Ciao", beans.get(1).getGreeting());
-        assertEquals("World", beans.get(1).getAudience());
-    }
-
-    @Test
-    public void testGetHelloWorldAsyncObservable() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync";
-        WebClient wc = WebClient.create(address,
-                                        Collections.singletonList(new ObservableRxInvokerProvider()));
-        Observable<String> obs = wc.accept("text/plain")
-            .rx(ObservableRxInvoker.class)
-            .get(String.class);
-        obs.map(s -> {
-            return s + s;
-        });
-
-        Thread.sleep(3000);
-
-        obs.subscribe(s -> assertDuplicateResponse(s));
-    }
-    @Test
-    public void testGetHelloWorldAsyncObservable404() throws Exception {
-        String address = "http://localhost:" + PORT + "/observable/textAsync404";
-        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
-            .target(address).request();
-        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
-            s -> {
-                fail("Exception expected");
-            },
-            t -> validateT((ExecutionException)t));
-    }
-
-    private void validateT(ExecutionException t) {
-        assertTrue(t.getCause() instanceof NotFoundException);
-    }
-    private void assertDuplicateResponse(String s) {
-        assertEquals("Hello, world!Hello, world!", s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
new file mode 100644
index 0000000..fe41958
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx2.server.FlowableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2FlowableServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2FlowableServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava2FlowableServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new FlowableInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava2FlowableService.class);
+        sf.setResourceProvider(RxJava2FlowableService.class,
+                               new SingletonResourceProvider(new RxJava2FlowableService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava2FlowableServer s = new RxJava2FlowableServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
new file mode 100644
index 0000000..7812977
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.rx2.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx2.server.JsonStreamingAsyncSubscriber;
+
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
+
+@Path("/rx2/flowable")
+public class RxJava2FlowableService {
+
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Flowable<HelloWorldBean> getJson() {
+        return Flowable.just(new HelloWorldBean());
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsync")
+    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+        final HelloWorldBean bean1 = new HelloWorldBean();
+        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+                Flowable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
+            }
+        }).start();
+
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
+        Flowable.just("Hello", "Ciao")
+            .map(s -> new HelloWorldBean(s))
+            .subscribeOn(Schedulers.computation())
+            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+    }
+    
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        Flowable.just("Hello, ").map(s -> s + "world!")
+            .subscribe(new StringAsyncSubscriber(ar));
+
+    }
+    
+    private static class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
+
+        private StringBuilder sb = new StringBuilder();
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+        @Override
+        public void onComplete() {
+            super.resume(sb.toString());
+        }
+
+        @Override
+        public void onNext(String s) {
+            sb.append(s);
+            super.requestNext();
+        }
+
+    }
+    
+    private static class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+        private List<T> beans = new LinkedList<T>();
+        ListAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+        @Override
+        public void onComplete() {
+            super.resume(beans);
+        }
+
+        @Override
+        public void onNext(T bean) {
+            beans.add(bean);
+            super.requestNext();
+        }
+
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
new file mode 100644
index 0000000..48df030
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava2ObservableServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJava2ObservableServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava2ObservableServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ObservableInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava2ObservableService.class);
+        sf.setResourceProvider(RxJava2ObservableService.class,
+                               new SingletonResourceProvider(new RxJava2ObservableService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava2ObservableServer s = new RxJava2ObservableServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index 28d053e..abb6e72 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -27,7 +27,7 @@ import javax.ws.rs.Produces;
 import io.reactivex.Observable;
 
 
-@Path("/observable2")
+@Path("/rx2/observable")
 public class RxJava2ObservableService {
 
     
@@ -37,7 +37,6 @@ public class RxJava2ObservableService {
     public Observable<HelloWorldBean> getJson() {
         return Observable.just(new HelloWorldBean());
     }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
deleted file mode 100644
index f9ab3ae..0000000
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.systest.jaxrs.reactive;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.ext.logging.LoggingOutInterceptor;
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-
-
-public class RxJava2Server extends AbstractBusTestServerBase {
-    public static final String PORT = allocatePort(RxJava2Server.class);
-
-    org.apache.cxf.endpoint.Server server;
-    public RxJava2Server() {
-    }
-
-    protected void run() {
-        Bus bus = BusFactory.getDefaultBus();
-        // Make sure default JSONProvider is not loaded
-        bus.setProperty("skip.default.json.provider.registration", true);
-        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ObservableInvoker());
-        sf.setProvider(new JacksonJsonProvider());
-        sf.getOutInterceptors().add(new LoggingOutInterceptor());
-        sf.setResourceClasses(RxJava2ObservableService.class);
-        sf.setResourceProvider(RxJava2ObservableService.class,
-                               new SingletonResourceProvider(new RxJava2ObservableService(), true));
-        sf.setAddress("http://localhost:" + PORT + "/");
-        server = sf.create();
-    }
-
-    public void tearDown() throws Exception {
-        server.stop();
-        server.destroy();
-        server = null;
-    }
-
-    public static void main(String[] args) {
-        try {
-            RxJava2Server s = new RxJava2Server();
-            s.start();
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            System.exit(-1);
-        } finally {
-            System.out.println("done!");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
new file mode 100644
index 0000000..85d576d
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJavaObservableServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(RxJavaObservableServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJavaObservableServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ObservableInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf.setProvider(streamProvider);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJavaObservableService.class);
+        sf.setResourceProvider(RxJavaObservableService.class,
+                               new SingletonResourceProvider(new RxJavaObservableService(), true));
+        sf.setAddress("http://localhost:" + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJavaObservableServer s = new RxJavaObservableServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/253900af/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
index de0f91f..a06d207 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java
@@ -26,18 +26,11 @@ import java.util.List;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-
-import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
-@Path("/observable")
+@Path("/rx")
 public class RxJavaObservableService {
 
     @GET
@@ -48,15 +41,6 @@ public class RxJavaObservableService {
     }
 
     @GET
-    @Produces("text/plain")
-    @Path("textAsync")
-    public void getTextAsync(@Suspended final AsyncResponse ar) {
-        Observable.just("Hello, ").map(s -> s + "world!")
-            .subscribe(new StringAsyncSubscriber(ar));
-
-    }
-
-    @GET
     @Produces("application/json")
     @Path("textJson")
     public Observable<HelloWorldBean> getJson() {
@@ -65,33 +49,6 @@ public class RxJavaObservableService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitListAsync")
-    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
-        final HelloWorldBean bean1 = new HelloWorldBean();
-        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
-        new Thread(new Runnable() {
-            public void run() {
-                try {
-                    Thread.sleep(2000);
-                } catch (InterruptedException ex) {
-                    // ignore
-                }
-                Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
-            }
-        }).start();
-
-    }
-    @GET
-    @Produces("application/json")
-    @Path("textJsonImplicitListAsyncStream")
-    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
-        Observable.just("Hello", "Ciao")
-            .map(s -> new HelloWorldBean(s))
-            .subscribeOn(Schedulers.computation())
-            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
-    }
-    @GET
-    @Produces("application/json")
     @Path("textJsonList")
     public Observable<List<HelloWorldBean>> getJsonList() {
         HelloWorldBean bean1 = new HelloWorldBean();
@@ -100,23 +57,7 @@ public class RxJavaObservableService {
         return Observable.just(Arrays.asList(bean1, bean2));
     }
 
-    private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
-
-        private StringBuilder sb = new StringBuilder();
-        StringAsyncSubscriber(AsyncResponse ar) {
-            super(ar);
-        }
-        @Override
-        public void onCompleted() {
-            super.resume(sb.toString());
-        }
-
-        @Override
-        public void onNext(String s) {
-            sb.append(s);
-        }
-
-    }
+    
 }