You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/02 15:20:13 UTC

cxf git commit: [CXF-6833] Moving RxJava code into its own module

Repository: cxf
Updated Branches:
  refs/heads/master 33cb4b0aa -> 6d82b75eb


[CXF-6833] Moving RxJava code into its own module


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6d82b75e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6d82b75e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6d82b75e

Branch: refs/heads/master
Commit: 6d82b75eb982f13f5f071178aeef66e55b0e3fd4
Parents: 33cb4b0
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Fri Sep 2 16:19:56 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Fri Sep 2 16:19:56 2016 +0100

----------------------------------------------------------------------
 rt/rs/extensions/providers/pom.xml              |   7 --
 .../provider/rx/AbstractAsyncSubscriber.java    |  56 ---------
 .../rx/JsonStreamingAsyncSubscriber.java        |  34 -----
 .../jaxrs/provider/rx/ListAsyncSubscriber.java  |  42 -------
 .../cxf/jaxrs/provider/rx/ObservableReader.java |  61 ---------
 .../cxf/jaxrs/provider/rx/ObservableWriter.java | 119 ------------------
 .../provider/rx/StreamingAsyncSubscriber.java   | 125 -------------------
 rt/rs/extensions/rx/pom.xml                     |  53 ++++++++
 .../cxf/jaxrs/rx/AbstractAsyncSubscriber.java   |  56 +++++++++
 .../jaxrs/rx/JsonStreamingAsyncSubscriber.java  |  34 +++++
 .../cxf/jaxrs/rx/ListAsyncSubscriber.java       |  42 +++++++
 .../apache/cxf/jaxrs/rx/ObservableReader.java   |  61 +++++++++
 .../apache/cxf/jaxrs/rx/ObservableWriter.java   | 119 ++++++++++++++++++
 .../cxf/jaxrs/rx/StreamingAsyncSubscriber.java  | 124 ++++++++++++++++++
 .../cxf/jaxrs/rx/ObservableWriterTest.java      |  32 +++++
 rt/rs/pom.xml                                   |   1 +
 systests/jaxrs/pom.xml                          |   5 +
 .../jaxrs/reactive/JAXRSReactiveTest.java       |   2 +-
 .../systest/jaxrs/reactive/ReactiveServer.java  |   2 +-
 .../systest/jaxrs/reactive/ReactiveService.java |   6 +-
 20 files changed, 532 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/pom.xml b/rt/rs/extensions/providers/pom.xml
index e37182c..b379e7c 100644
--- a/rt/rs/extensions/providers/pom.xml
+++ b/rt/rs/extensions/providers/pom.xml
@@ -120,13 +120,6 @@
             <optional>true</optional>
         </dependency>     
         <dependency>
-          <groupId>io.reactivex</groupId>
-          <artifactId>rxjava</artifactId>
-          <version>${cxf.rx.java.version}</version>
-          <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-bindings-soap</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
deleted file mode 100644
index c49144f..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-    
-    private AsyncResponse ar;
-    
-    protected AbstractAsyncSubscriber(AsyncResponse ar) {
-        this.ar = ar;
-    }
-    public void resume(T response) {
-        ar.resume(response);
-    }
-    
-    public void resume(List<T> response) {
-        ar.resume(response);
-    }
-    
-    public void resume(StreamingResponse<T> response) {
-        ar.resume(response);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-        ar.resume(t);
-    }
-    
-    protected AsyncResponse getAsyncResponse() {
-        return ar;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index 27384b1..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
-        this(ar, 1000);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
-        this(ar, pollTimeout, 0);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
-        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
deleted file mode 100644
index 6bfb1cb..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-    
-    private List<T> beans = new LinkedList<T>();
-    public ListAsyncSubscriber(AsyncResponse ar) {
-        super(ar);
-    }
-    @Override
-    public void onCompleted() {
-        super.resume(beans);
-    }
-
-    @Override
-    public void onNext(T bean) {
-        beans.add(bean);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
deleted file mode 100644
index 8a63311..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-    
-    @Context
-    private Providers providers;
-    
-    @Override
-    public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
-                                  MultivaluedMap<String, String> headers, InputStream is)
-                                      throws IOException, WebApplicationException {
-        @SuppressWarnings("unchecked")
-        Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
-        final MessageBodyReader<T> mbr = 
-            (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
-        if (mbr == null) {
-            throw new ProcessingException("MBR is null");
-        }
-        return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
deleted file mode 100644
index 6317506..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-    
-    @Context
-    private Providers providers;
-    private boolean writeSingleElementAsList;
-    
-    @Override
-    public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
-        // TODO Auto-generated method stub
-        return -1;
-    }
-
-    @Override
-    public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
-                        MultivaluedMap<String, Object> headers, OutputStream os)
-                            throws IOException, WebApplicationException {
-        List<T> entities = new LinkedList<T>();
-        obs.subscribe(value -> entities.add(value),
-            throwable -> throwError(throwable));
-        if (!entities.isEmpty()) {
-            
-            if (entities.get(0) instanceof List) {
-                List<T> allEntities = new LinkedList<T>();
-                for (T obj : entities) {
-                    @SuppressWarnings("unchecked")
-                    List<T> listT = (List<T>)obj;
-                    allEntities.addAll(listT);
-                }
-                writeToOutputStream(allEntities, anns, mt, headers, os);
-            } else if (entities.size() > 1 || writeSingleElementAsList) {
-                writeToOutputStream(entities, anns, mt, headers, os);
-            } else {
-                writeToOutputStream(entities.get(0), anns, mt, headers, os);
-            }
-        }
-    }
-
-    private void writeToOutputStream(Object value,
-                                     Annotation[] anns,
-                                     MediaType mt,
-                                     MultivaluedMap<String, Object> headers, 
-                                     OutputStream os) {
-        Class<?> valueCls = value.getClass();
-        Type valueType = null;
-        if (value instanceof List) {
-            List<?> list = (List<?>)value;
-            valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
-        } else {
-            valueType = valueCls;
-        }
-        @SuppressWarnings("unchecked")
-        MessageBodyWriter<Object> writer = 
-            (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
-        if (writer == null) {
-            throwError(null);
-        }
-    
-        try {
-            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);    
-        } catch (IOException ex) {
-            throwError(ex);
-        }
-    }
-    
-    private static void throwError(Throwable cause) {
-        throw ExceptionUtils.toInternalServerErrorException(cause, null);
-    }
-
-    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
-        this.writeSingleElementAsList = writeSingleElementAsList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
deleted file mode 100644
index 48a5ac4..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-    
-    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
-    private String openTag;
-    private String closeTag;
-    private String separator;
-    private long pollTimeout;
-    private long asyncTimeout;
-    private volatile boolean completed;
-    private volatile boolean firstWriteDone;
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
-        this(ar, openTag, closeTag, "", 1000);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
-                                    long pollTimeout) {
-        this(ar, openTag, closeTag, sep, pollTimeout, 0);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
-                                    long pollTimeout, long asyncTimeout) {
-        super(ar);
-        this.openTag = openTag;
-        this.closeTag = closeTag;
-        this.separator = sep;
-        this.pollTimeout = pollTimeout;
-        this.asyncTimeout = 0;
-        if (asyncTimeout > 0) {
-            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            ar.setTimeoutHandler(new TimeoutHandlerImpl());
-        }
-    }
-    @Override
-    public void onStart() {
-        if (asyncTimeout == 0) {
-            resumeAsyncResponse();
-        }
-    }
-    private void resumeAsyncResponse() {
-        super.resume(new StreamingResponseImpl());
-    }
-    @Override
-    public void onCompleted() {
-        completed = true;
-    }
-    
-    @Override
-    public void onNext(T bean) {
-        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
-            resumeAsyncResponse();
-        }
-        queue.add(bean);
-    }
-    private class StreamingResponseImpl implements StreamingResponse<T> {
-
-        @Override
-        public void writeTo(Writer<T> writer) throws IOException {
-            if (openTag != null) {
-                writer.getEntityStream().write(StringUtils.getBytesUtf8(openTag));
-            }
-            while (!completed || queue.size() > 0) {
-                try {
-                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
-                    if (bean != null) {
-                        if (firstWriteDone) {
-                            writer.getEntityStream().write(StringUtils.getBytesUtf8(separator));
-                        }
-                        writer.write(bean);
-                        firstWriteDone = true;
-                        
-                    }
-                } catch (InterruptedException ex) {
-                    // ignore
-                }
-            }
-            if (closeTag != null) {
-                writer.getEntityStream().write(StringUtils.getBytesUtf8(closeTag));
-            }
-            
-        }
-
-    }
-    public class TimeoutHandlerImpl implements TimeoutHandler {
-
-        @Override
-        public void handleTimeout(AsyncResponse asyncResponse) {
-            if (queue.isEmpty()) {
-                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            } else {
-                resumeAsyncResponse();
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
new file mode 100644
index 0000000..a65484a
--- /dev/null
+++ b/rt/rs/extensions/rx/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>cxf-rt-rs-extension-rx</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache CXF JAX-RS Extensions: RxJava</name>
+    <description>Apache CXF JAX-RS Extensions: RxJava</description>
+    <url>http://cxf.apache.org</url>
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>3.2.0-SNAPSHOT</version>
+        <relativePath>../../../../parent/pom.xml</relativePath>
+    </parent>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${cxf.rx.java.version}</version>
+          <scope>provided</scope>
+          <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..80b1592
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+    
+    private AsyncResponse ar;
+    
+    protected AbstractAsyncSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+    
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+    
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+    
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..b5c22a4
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        this(ar, pollTimeout, 0);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
new file mode 100644
index 0000000..e94a861
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private List<T> beans = new LinkedList<T>();
+    public ListAsyncSubscriber(AsyncResponse ar) {
+        super(ar);
+    }
+    @Override
+    public void onCompleted() {
+        super.resume(beans);
+    }
+
+    @Override
+    public void onNext(T bean) {
+        beans.add(bean);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
new file mode 100644
index 0000000..0e0780a
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.InjectionUtils;
+
+import rx.Observable;
+
+public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
+    
+    @Context
+    private Providers providers;
+    
+    @Override
+    public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+        return true;
+    }
+
+    @Override
+    public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
+                                  MultivaluedMap<String, String> headers, InputStream is)
+                                      throws IOException, WebApplicationException {
+        @SuppressWarnings("unchecked")
+        Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
+        final MessageBodyReader<T> mbr = 
+            (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
+        if (mbr == null) {
+            throw new ProcessingException("MBR is null");
+        }
+        return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
new file mode 100644
index 0000000..475d36b
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
+
+import rx.Observable;
+
+@Provider
+public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
+    
+    @Context
+    private Providers providers;
+    private boolean writeSingleElementAsList;
+    
+    @Override
+    public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
+        // TODO Auto-generated method stub
+        return -1;
+    }
+
+    @Override
+    public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+        return true;
+    }
+
+    @Override
+    public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
+                        MultivaluedMap<String, Object> headers, OutputStream os)
+                            throws IOException, WebApplicationException {
+        List<T> entities = new LinkedList<T>();
+        obs.subscribe(value -> entities.add(value),
+            throwable -> throwError(throwable));
+        if (!entities.isEmpty()) {
+            
+            if (entities.get(0) instanceof List) {
+                List<T> allEntities = new LinkedList<T>();
+                for (T obj : entities) {
+                    @SuppressWarnings("unchecked")
+                    List<T> listT = (List<T>)obj;
+                    allEntities.addAll(listT);
+                }
+                writeToOutputStream(allEntities, anns, mt, headers, os);
+            } else if (entities.size() > 1 || writeSingleElementAsList) {
+                writeToOutputStream(entities, anns, mt, headers, os);
+            } else {
+                writeToOutputStream(entities.get(0), anns, mt, headers, os);
+            }
+        }
+    }
+
+    private void writeToOutputStream(Object value,
+                                     Annotation[] anns,
+                                     MediaType mt,
+                                     MultivaluedMap<String, Object> headers, 
+                                     OutputStream os) {
+        Class<?> valueCls = value.getClass();
+        Type valueType = null;
+        if (value instanceof List) {
+            List<?> list = (List<?>)value;
+            valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+        } else {
+            valueType = valueCls;
+        }
+        @SuppressWarnings("unchecked")
+        MessageBodyWriter<Object> writer = 
+            (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
+        if (writer == null) {
+            throwError(null);
+        }
+    
+        try {
+            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);    
+        } catch (IOException ex) {
+            throwError(ex);
+        }
+    }
+    
+    private static void throwError(Throwable cause) {
+        throw ExceptionUtils.toInternalServerErrorException(cause, null);
+    }
+
+    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+        this.writeSingleElementAsList = writeSingleElementAsList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c531e98
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private AtomicBoolean firstWriteDone = new AtomicBoolean();
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+        this(ar, openTag, closeTag, "", 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onStart() {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+    }
+    private void resumeAsyncResponse() {
+        super.resume(new StreamingResponseImpl());
+    }
+    @Override
+    public void onCompleted() {
+        completed = true;
+    }
+    
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+    }
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+            }
+            while (!completed || queue.size() > 0) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone.getAndSet(true)) {
+                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+                        }
+                        writer.write(bean);
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+            }
+            
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
new file mode 100644
index 0000000..c6d0086
--- /dev/null
+++ b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.rx;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ObservableWriterTest extends Assert {
+
+    
+    @Test
+    public void testIsWriteable() {
+    }
+       
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index a374743..0348e4b 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -37,6 +37,7 @@
         <module>extensions/json-basic</module>
         <module>extensions/providers</module>
         <module>extensions/search</module>
+        <module>extensions/rx</module>
         <module>security</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/pom.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 17a19b1..fe9bc56 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -328,6 +328,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-rx</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-security-cors</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 45a9468..2f4c496 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -29,7 +29,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.provider.rx.ObservableReader;
+import org.apache.cxf.jaxrs.rx.ObservableReader;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index d12641a..4915b71 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.provider.rx.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.ObservableWriter;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index 6081f2b..bac9472 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -29,9 +29,9 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 
-import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.provider.rx.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.provider.rx.ListAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.ListAsyncSubscriber;
 
 import rx.Observable;
 import rx.schedulers.Schedulers;