You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/02 15:27:57 UTC

cxf git commit: [CXF-6833] Package updates

Repository: cxf
Updated Branches:
  refs/heads/master 6d82b75eb -> 6fcdc7e9e


[CXF-6833] Package updates


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6fcdc7e9
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6fcdc7e9
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6fcdc7e9

Branch: refs/heads/master
Commit: 6fcdc7e9eec2414a40bd3161cf237ef182e313a6
Parents: 6d82b75
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Fri Sep 2 16:27:40 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Fri Sep 2 16:27:40 2016 +0100

----------------------------------------------------------------------
 .../cxf/jaxrs/rx/AbstractAsyncSubscriber.java   |  56 ---------
 .../jaxrs/rx/JsonStreamingAsyncSubscriber.java  |  34 -----
 .../cxf/jaxrs/rx/ListAsyncSubscriber.java       |  42 -------
 .../apache/cxf/jaxrs/rx/ObservableReader.java   |  61 ---------
 .../apache/cxf/jaxrs/rx/ObservableWriter.java   | 119 ------------------
 .../cxf/jaxrs/rx/StreamingAsyncSubscriber.java  | 124 -------------------
 .../cxf/jaxrs/rx/provider/ObservableReader.java |  61 +++++++++
 .../cxf/jaxrs/rx/provider/ObservableWriter.java | 119 ++++++++++++++++++
 .../rx/server/AbstractAsyncSubscriber.java      |  56 +++++++++
 .../rx/server/JsonStreamingAsyncSubscriber.java |  34 +++++
 .../jaxrs/rx/server/ListAsyncSubscriber.java    |  42 +++++++
 .../rx/server/StreamingAsyncSubscriber.java     | 124 +++++++++++++++++++
 .../cxf/jaxrs/rx/ObservableWriterTest.java      |  32 -----
 .../jaxrs/rx/provider/ObservableWriterTest.java |  32 +++++
 .../jaxrs/reactive/JAXRSReactiveTest.java       |   2 +-
 .../systest/jaxrs/reactive/ReactiveServer.java  |   2 +-
 .../systest/jaxrs/reactive/ReactiveService.java |   6 +-
 17 files changed, 473 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
deleted file mode 100644
index 80b1592..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-    
-    private AsyncResponse ar;
-    
-    protected AbstractAsyncSubscriber(AsyncResponse ar) {
-        this.ar = ar;
-    }
-    public void resume(T response) {
-        ar.resume(response);
-    }
-    
-    public void resume(List<T> response) {
-        ar.resume(response);
-    }
-    
-    public void resume(StreamingResponse<T> response) {
-        ar.resume(response);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-        ar.resume(t);
-    }
-    
-    protected AsyncResponse getAsyncResponse() {
-        return ar;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index b5c22a4..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
-        this(ar, 1000);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
-        this(ar, pollTimeout, 0);
-    }
-    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
-        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
deleted file mode 100644
index e94a861..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-    
-    private List<T> beans = new LinkedList<T>();
-    public ListAsyncSubscriber(AsyncResponse ar) {
-        super(ar);
-    }
-    @Override
-    public void onCompleted() {
-        super.resume(beans);
-    }
-
-    @Override
-    public void onNext(T bean) {
-        beans.add(bean);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
deleted file mode 100644
index 0e0780a..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-    
-    @Context
-    private Providers providers;
-    
-    @Override
-    public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
-                                  MultivaluedMap<String, String> headers, InputStream is)
-                                      throws IOException, WebApplicationException {
-        @SuppressWarnings("unchecked")
-        Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
-        final MessageBodyReader<T> mbr = 
-            (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
-        if (mbr == null) {
-            throw new ProcessingException("MBR is null");
-        }
-        return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
deleted file mode 100644
index 475d36b..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-    
-    @Context
-    private Providers providers;
-    private boolean writeSingleElementAsList;
-    
-    @Override
-    public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
-        // TODO Auto-generated method stub
-        return -1;
-    }
-
-    @Override
-    public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
-                        MultivaluedMap<String, Object> headers, OutputStream os)
-                            throws IOException, WebApplicationException {
-        List<T> entities = new LinkedList<T>();
-        obs.subscribe(value -> entities.add(value),
-            throwable -> throwError(throwable));
-        if (!entities.isEmpty()) {
-            
-            if (entities.get(0) instanceof List) {
-                List<T> allEntities = new LinkedList<T>();
-                for (T obj : entities) {
-                    @SuppressWarnings("unchecked")
-                    List<T> listT = (List<T>)obj;
-                    allEntities.addAll(listT);
-                }
-                writeToOutputStream(allEntities, anns, mt, headers, os);
-            } else if (entities.size() > 1 || writeSingleElementAsList) {
-                writeToOutputStream(entities, anns, mt, headers, os);
-            } else {
-                writeToOutputStream(entities.get(0), anns, mt, headers, os);
-            }
-        }
-    }
-
-    private void writeToOutputStream(Object value,
-                                     Annotation[] anns,
-                                     MediaType mt,
-                                     MultivaluedMap<String, Object> headers, 
-                                     OutputStream os) {
-        Class<?> valueCls = value.getClass();
-        Type valueType = null;
-        if (value instanceof List) {
-            List<?> list = (List<?>)value;
-            valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
-        } else {
-            valueType = valueCls;
-        }
-        @SuppressWarnings("unchecked")
-        MessageBodyWriter<Object> writer = 
-            (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
-        if (writer == null) {
-            throwError(null);
-        }
-    
-        try {
-            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);    
-        } catch (IOException ex) {
-            throwError(ex);
-        }
-    }
-    
-    private static void throwError(Throwable cause) {
-        throw ExceptionUtils.toInternalServerErrorException(cause, null);
-    }
-
-    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
-        this.writeSingleElementAsList = writeSingleElementAsList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
deleted file mode 100644
index c531e98..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-    
-    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
-    private String openTag;
-    private String closeTag;
-    private String separator;
-    private long pollTimeout;
-    private long asyncTimeout;
-    private volatile boolean completed;
-    private AtomicBoolean firstWriteDone = new AtomicBoolean();
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
-        this(ar, openTag, closeTag, "", 1000);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
-                                    long pollTimeout) {
-        this(ar, openTag, closeTag, sep, pollTimeout, 0);
-    }
-    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
-                                    long pollTimeout, long asyncTimeout) {
-        super(ar);
-        this.openTag = openTag;
-        this.closeTag = closeTag;
-        this.separator = sep;
-        this.pollTimeout = pollTimeout;
-        this.asyncTimeout = 0;
-        if (asyncTimeout > 0) {
-            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            ar.setTimeoutHandler(new TimeoutHandlerImpl());
-        }
-    }
-    @Override
-    public void onStart() {
-        if (asyncTimeout == 0) {
-            resumeAsyncResponse();
-        }
-    }
-    private void resumeAsyncResponse() {
-        super.resume(new StreamingResponseImpl());
-    }
-    @Override
-    public void onCompleted() {
-        completed = true;
-    }
-    
-    @Override
-    public void onNext(T bean) {
-        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
-            resumeAsyncResponse();
-        }
-        queue.add(bean);
-    }
-    private class StreamingResponseImpl implements StreamingResponse<T> {
-
-        @Override
-        public void writeTo(Writer<T> writer) throws IOException {
-            if (openTag != null) {
-                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
-            }
-            while (!completed || queue.size() > 0) {
-                try {
-                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
-                    if (bean != null) {
-                        if (firstWriteDone.getAndSet(true)) {
-                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
-                        }
-                        writer.write(bean);
-                    }
-                } catch (InterruptedException ex) {
-                    // ignore
-                }
-            }
-            if (closeTag != null) {
-                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
-            }
-            
-        }
-
-    }
-    public class TimeoutHandlerImpl implements TimeoutHandler {
-
-        @Override
-        public void handleTimeout(AsyncResponse asyncResponse) {
-            if (queue.isEmpty()) {
-                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
-            } else {
-                resumeAsyncResponse();
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
new file mode 100644
index 0000000..f05f478
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.InjectionUtils;
+
+import rx.Observable;
+
+public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
+    
+    @Context
+    private Providers providers;
+    
+    @Override
+    public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+        return true;
+    }
+
+    @Override
+    public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
+                                  MultivaluedMap<String, String> headers, InputStream is)
+                                      throws IOException, WebApplicationException {
+        @SuppressWarnings("unchecked")
+        Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
+        final MessageBodyReader<T> mbr = 
+            (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
+        if (mbr == null) {
+            throw new ProcessingException("MBR is null");
+        }
+        return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
new file mode 100644
index 0000000..b4ed2af
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.provider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
+
+import rx.Observable;
+
+@Provider
+public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
+    
+    @Context
+    private Providers providers;
+    private boolean writeSingleElementAsList;
+    
+    @Override
+    public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
+        // TODO Auto-generated method stub
+        return -1;
+    }
+
+    @Override
+    public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+        return true;
+    }
+
+    @Override
+    public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
+                        MultivaluedMap<String, Object> headers, OutputStream os)
+                            throws IOException, WebApplicationException {
+        List<T> entities = new LinkedList<T>();
+        obs.subscribe(value -> entities.add(value),
+            throwable -> throwError(throwable));
+        if (!entities.isEmpty()) {
+            
+            if (entities.get(0) instanceof List) {
+                List<T> allEntities = new LinkedList<T>();
+                for (T obj : entities) {
+                    @SuppressWarnings("unchecked")
+                    List<T> listT = (List<T>)obj;
+                    allEntities.addAll(listT);
+                }
+                writeToOutputStream(allEntities, anns, mt, headers, os);
+            } else if (entities.size() > 1 || writeSingleElementAsList) {
+                writeToOutputStream(entities, anns, mt, headers, os);
+            } else {
+                writeToOutputStream(entities.get(0), anns, mt, headers, os);
+            }
+        }
+    }
+
+    private void writeToOutputStream(Object value,
+                                     Annotation[] anns,
+                                     MediaType mt,
+                                     MultivaluedMap<String, Object> headers, 
+                                     OutputStream os) {
+        Class<?> valueCls = value.getClass();
+        Type valueType = null;
+        if (value instanceof List) {
+            List<?> list = (List<?>)value;
+            valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+        } else {
+            valueType = valueCls;
+        }
+        @SuppressWarnings("unchecked")
+        MessageBodyWriter<Object> writer = 
+            (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
+        if (writer == null) {
+            throwError(null);
+        }
+    
+        try {
+            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);    
+        } catch (IOException ex) {
+            throwError(ex);
+        }
+    }
+    
+    private static void throwError(Throwable cause) {
+        throw ExceptionUtils.toInternalServerErrorException(cause, null);
+    }
+
+    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+        this.writeSingleElementAsList = writeSingleElementAsList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..94a85fc
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+    
+    private AsyncResponse ar;
+    
+    protected AbstractAsyncSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+    
+    public void resume(List<T> response) {
+        ar.resume(response);
+    }
+    
+    public void resume(StreamingResponse<T> response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+    
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c71c8c1
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+        this(ar, 1000);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+        this(ar, pollTimeout, 0);
+    }
+    public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+        super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
new file mode 100644
index 0000000..5be73e5
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private List<T> beans = new LinkedList<T>();
+    public ListAsyncSubscriber(AsyncResponse ar) {
+        super(ar);
+    }
+    @Override
+    public void onCompleted() {
+        super.resume(beans);
+    }
+
+    @Override
+    public void onNext(T bean) {
+        beans.add(bean);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..bd16292
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+    
+    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+    private String openTag;
+    private String closeTag;
+    private String separator;
+    private long pollTimeout;
+    private long asyncTimeout;
+    private volatile boolean completed;
+    private AtomicBoolean firstWriteDone = new AtomicBoolean();
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+        this(ar, openTag, closeTag, "", 1000);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
+                                    long pollTimeout) {
+        this(ar, openTag, closeTag, sep, pollTimeout, 0);
+    }
+    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, 
+                                    long pollTimeout, long asyncTimeout) {
+        super(ar);
+        this.openTag = openTag;
+        this.closeTag = closeTag;
+        this.separator = sep;
+        this.pollTimeout = pollTimeout;
+        this.asyncTimeout = 0;
+        if (asyncTimeout > 0) {
+            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            ar.setTimeoutHandler(new TimeoutHandlerImpl());
+        }
+    }
+    @Override
+    public void onStart() {
+        if (asyncTimeout == 0) {
+            resumeAsyncResponse();
+        }
+    }
+    private void resumeAsyncResponse() {
+        super.resume(new StreamingResponseImpl());
+    }
+    @Override
+    public void onCompleted() {
+        completed = true;
+    }
+    
+    @Override
+    public void onNext(T bean) {
+        if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+            resumeAsyncResponse();
+        }
+        queue.add(bean);
+    }
+    private class StreamingResponseImpl implements StreamingResponse<T> {
+
+        @Override
+        public void writeTo(Writer<T> writer) throws IOException {
+            if (openTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+            }
+            while (!completed || queue.size() > 0) {
+                try {
+                    T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                    if (bean != null) {
+                        if (firstWriteDone.getAndSet(true)) {
+                            writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+                        }
+                        writer.write(bean);
+                    }
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            if (closeTag != null) {
+                writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+            }
+            
+        }
+
+    }
+    public class TimeoutHandlerImpl implements TimeoutHandler {
+
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            if (queue.isEmpty()) {
+                asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resumeAsyncResponse();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
deleted file mode 100644
index c6d0086..0000000
--- a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.rx;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ObservableWriterTest extends Assert {
-
-    
-    @Test
-    public void testIsWriteable() {
-    }
-       
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
new file mode 100644
index 0000000..049377b
--- /dev/null
+++ b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.rx.provider;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ObservableWriterTest extends Assert {
+
+    
+    @Test
+    public void testIsWriteable() {
+    }
+       
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 2f4c496..59da436 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -29,7 +29,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.ObservableReader;
+import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index 4915b71..afd5cbb 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.provider.ObservableWriter;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index bac9472..9c9ae08 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -29,9 +29,9 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 
-import org.apache.cxf.jaxrs.rx.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.ListAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
 
 import rx.Observable;
 import rx.schedulers.Schedulers;