You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/05 01:10:18 UTC
[11/23] cxf git commit: [CXF-6833] Moving RxJava code into its own
module
[CXF-6833] Moving RxJava code into its own module
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6d82b75e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6d82b75e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6d82b75e
Branch: refs/heads/master-jaxrs-2.1
Commit: 6d82b75eb982f13f5f071178aeef66e55b0e3fd4
Parents: 33cb4b0
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Fri Sep 2 16:19:56 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Fri Sep 2 16:19:56 2016 +0100
----------------------------------------------------------------------
rt/rs/extensions/providers/pom.xml | 7 --
.../provider/rx/AbstractAsyncSubscriber.java | 56 ---------
.../rx/JsonStreamingAsyncSubscriber.java | 34 -----
.../jaxrs/provider/rx/ListAsyncSubscriber.java | 42 -------
.../cxf/jaxrs/provider/rx/ObservableReader.java | 61 ---------
.../cxf/jaxrs/provider/rx/ObservableWriter.java | 119 ------------------
.../provider/rx/StreamingAsyncSubscriber.java | 125 -------------------
rt/rs/extensions/rx/pom.xml | 53 ++++++++
.../cxf/jaxrs/rx/AbstractAsyncSubscriber.java | 56 +++++++++
.../jaxrs/rx/JsonStreamingAsyncSubscriber.java | 34 +++++
.../cxf/jaxrs/rx/ListAsyncSubscriber.java | 42 +++++++
.../apache/cxf/jaxrs/rx/ObservableReader.java | 61 +++++++++
.../apache/cxf/jaxrs/rx/ObservableWriter.java | 119 ++++++++++++++++++
.../cxf/jaxrs/rx/StreamingAsyncSubscriber.java | 124 ++++++++++++++++++
.../cxf/jaxrs/rx/ObservableWriterTest.java | 32 +++++
rt/rs/pom.xml | 1 +
systests/jaxrs/pom.xml | 5 +
.../jaxrs/reactive/JAXRSReactiveTest.java | 2 +-
.../systest/jaxrs/reactive/ReactiveServer.java | 2 +-
.../systest/jaxrs/reactive/ReactiveService.java | 6 +-
20 files changed, 532 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/pom.xml b/rt/rs/extensions/providers/pom.xml
index e37182c..b379e7c 100644
--- a/rt/rs/extensions/providers/pom.xml
+++ b/rt/rs/extensions/providers/pom.xml
@@ -120,13 +120,6 @@
<optional>true</optional>
</dependency>
<dependency>
- <groupId>io.reactivex</groupId>
- <artifactId>rxjava</artifactId>
- <version>${cxf.rx.java.version}</version>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-bindings-soap</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
deleted file mode 100644
index c49144f..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-
- private AsyncResponse ar;
-
- protected AbstractAsyncSubscriber(AsyncResponse ar) {
- this.ar = ar;
- }
- public void resume(T response) {
- ar.resume(response);
- }
-
- public void resume(List<T> response) {
- ar.resume(response);
- }
-
- public void resume(StreamingResponse<T> response) {
- ar.resume(response);
- }
-
- @Override
- public void onError(Throwable t) {
- ar.resume(t);
- }
-
- protected AsyncResponse getAsyncResponse() {
- return ar;
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index 27384b1..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
- public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
- this(ar, 1000);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
- this(ar, pollTimeout, 0);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
- super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
deleted file mode 100644
index 6bfb1cb..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private List<T> beans = new LinkedList<T>();
- public ListAsyncSubscriber(AsyncResponse ar) {
- super(ar);
- }
- @Override
- public void onCompleted() {
- super.resume(beans);
- }
-
- @Override
- public void onNext(T bean) {
- beans.add(bean);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
deleted file mode 100644
index 8a63311..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-
- @Context
- private Providers providers;
-
- @Override
- public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, String> headers, InputStream is)
- throws IOException, WebApplicationException {
- @SuppressWarnings("unchecked")
- Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
- final MessageBodyReader<T> mbr =
- (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
- if (mbr == null) {
- throw new ProcessingException("MBR is null");
- }
- return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
deleted file mode 100644
index 6317506..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-
- @Context
- private Providers providers;
- private boolean writeSingleElementAsList;
-
- @Override
- public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
- // TODO Auto-generated method stub
- return -1;
- }
-
- @Override
- public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, Object> headers, OutputStream os)
- throws IOException, WebApplicationException {
- List<T> entities = new LinkedList<T>();
- obs.subscribe(value -> entities.add(value),
- throwable -> throwError(throwable));
- if (!entities.isEmpty()) {
-
- if (entities.get(0) instanceof List) {
- List<T> allEntities = new LinkedList<T>();
- for (T obj : entities) {
- @SuppressWarnings("unchecked")
- List<T> listT = (List<T>)obj;
- allEntities.addAll(listT);
- }
- writeToOutputStream(allEntities, anns, mt, headers, os);
- } else if (entities.size() > 1 || writeSingleElementAsList) {
- writeToOutputStream(entities, anns, mt, headers, os);
- } else {
- writeToOutputStream(entities.get(0), anns, mt, headers, os);
- }
- }
- }
-
- private void writeToOutputStream(Object value,
- Annotation[] anns,
- MediaType mt,
- MultivaluedMap<String, Object> headers,
- OutputStream os) {
- Class<?> valueCls = value.getClass();
- Type valueType = null;
- if (value instanceof List) {
- List<?> list = (List<?>)value;
- valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
- } else {
- valueType = valueCls;
- }
- @SuppressWarnings("unchecked")
- MessageBodyWriter<Object> writer =
- (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
- if (writer == null) {
- throwError(null);
- }
-
- try {
- writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
- } catch (IOException ex) {
- throwError(ex);
- }
- }
-
- private static void throwError(Throwable cause) {
- throw ExceptionUtils.toInternalServerErrorException(cause, null);
- }
-
- public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
- this.writeSingleElementAsList = writeSingleElementAsList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
deleted file mode 100644
index 48a5ac4..0000000
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.provider.rx;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
- private String openTag;
- private String closeTag;
- private String separator;
- private long pollTimeout;
- private long asyncTimeout;
- private volatile boolean completed;
- private volatile boolean firstWriteDone;
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
- this(ar, openTag, closeTag, "", 1000);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout) {
- this(ar, openTag, closeTag, sep, pollTimeout, 0);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout, long asyncTimeout) {
- super(ar);
- this.openTag = openTag;
- this.closeTag = closeTag;
- this.separator = sep;
- this.pollTimeout = pollTimeout;
- this.asyncTimeout = 0;
- if (asyncTimeout > 0) {
- ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- ar.setTimeoutHandler(new TimeoutHandlerImpl());
- }
- }
- @Override
- public void onStart() {
- if (asyncTimeout == 0) {
- resumeAsyncResponse();
- }
- }
- private void resumeAsyncResponse() {
- super.resume(new StreamingResponseImpl());
- }
- @Override
- public void onCompleted() {
- completed = true;
- }
-
- @Override
- public void onNext(T bean) {
- if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
- resumeAsyncResponse();
- }
- queue.add(bean);
- }
- private class StreamingResponseImpl implements StreamingResponse<T> {
-
- @Override
- public void writeTo(Writer<T> writer) throws IOException {
- if (openTag != null) {
- writer.getEntityStream().write(StringUtils.getBytesUtf8(openTag));
- }
- while (!completed || queue.size() > 0) {
- try {
- T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
- if (bean != null) {
- if (firstWriteDone) {
- writer.getEntityStream().write(StringUtils.getBytesUtf8(separator));
- }
- writer.write(bean);
- firstWriteDone = true;
-
- }
- } catch (InterruptedException ex) {
- // ignore
- }
- }
- if (closeTag != null) {
- writer.getEntityStream().write(StringUtils.getBytesUtf8(closeTag));
- }
-
- }
-
- }
- public class TimeoutHandlerImpl implements TimeoutHandler {
-
- @Override
- public void handleTimeout(AsyncResponse asyncResponse) {
- if (queue.isEmpty()) {
- asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- } else {
- resumeAsyncResponse();
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
new file mode 100644
index 0000000..a65484a
--- /dev/null
+++ b/rt/rs/extensions/rx/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cxf-rt-rs-extension-rx</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache CXF JAX-RS Extensions: RxJava</name>
+ <description>Apache CXF JAX-RS Extensions: RxJava</description>
+ <url>http://cxf.apache.org</url>
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-parent</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ <relativePath>../../../../parent/pom.xml</relativePath>
+ </parent>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${cxf.rx.java.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..80b1592
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+
+ private AsyncResponse ar;
+
+ protected AbstractAsyncSubscriber(AsyncResponse ar) {
+ this.ar = ar;
+ }
+ public void resume(T response) {
+ ar.resume(response);
+ }
+
+ public void resume(List<T> response) {
+ ar.resume(response);
+ }
+
+ public void resume(StreamingResponse<T> response) {
+ ar.resume(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ ar.resume(t);
+ }
+
+ protected AsyncResponse getAsyncResponse() {
+ return ar;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..b5c22a4
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+ this(ar, 1000);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+ this(ar, pollTimeout, 0);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+ super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
new file mode 100644
index 0000000..e94a861
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private List<T> beans = new LinkedList<T>();
+ public ListAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onCompleted() {
+ super.resume(beans);
+ }
+
+ @Override
+ public void onNext(T bean) {
+ beans.add(bean);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
new file mode 100644
index 0000000..0e0780a
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.InjectionUtils;
+
+import rx.Observable;
+
+public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
+
+ @Context
+ private Providers providers;
+
+ @Override
+ public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, String> headers, InputStream is)
+ throws IOException, WebApplicationException {
+ @SuppressWarnings("unchecked")
+ Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
+ final MessageBodyReader<T> mbr =
+ (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
+ if (mbr == null) {
+ throw new ProcessingException("MBR is null");
+ }
+ return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
new file mode 100644
index 0000000..475d36b
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
+
+import rx.Observable;
+
+@Provider
+public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
+
+ @Context
+ private Providers providers;
+ private boolean writeSingleElementAsList;
+
+ @Override
+ public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
+ // TODO Auto-generated method stub
+ return -1;
+ }
+
+ @Override
+ public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, Object> headers, OutputStream os)
+ throws IOException, WebApplicationException {
+ List<T> entities = new LinkedList<T>();
+ obs.subscribe(value -> entities.add(value),
+ throwable -> throwError(throwable));
+ if (!entities.isEmpty()) {
+
+ if (entities.get(0) instanceof List) {
+ List<T> allEntities = new LinkedList<T>();
+ for (T obj : entities) {
+ @SuppressWarnings("unchecked")
+ List<T> listT = (List<T>)obj;
+ allEntities.addAll(listT);
+ }
+ writeToOutputStream(allEntities, anns, mt, headers, os);
+ } else if (entities.size() > 1 || writeSingleElementAsList) {
+ writeToOutputStream(entities, anns, mt, headers, os);
+ } else {
+ writeToOutputStream(entities.get(0), anns, mt, headers, os);
+ }
+ }
+ }
+
+ private void writeToOutputStream(Object value,
+ Annotation[] anns,
+ MediaType mt,
+ MultivaluedMap<String, Object> headers,
+ OutputStream os) {
+ Class<?> valueCls = value.getClass();
+ Type valueType = null;
+ if (value instanceof List) {
+ List<?> list = (List<?>)value;
+ valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+ } else {
+ valueType = valueCls;
+ }
+ @SuppressWarnings("unchecked")
+ MessageBodyWriter<Object> writer =
+ (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
+ if (writer == null) {
+ throwError(null);
+ }
+
+ try {
+ writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
+ } catch (IOException ex) {
+ throwError(ex);
+ }
+ }
+
+ private static void throwError(Throwable cause) {
+ throw ExceptionUtils.toInternalServerErrorException(cause, null);
+ }
+
+ public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+ this.writeSingleElementAsList = writeSingleElementAsList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c531e98
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+ private String openTag;
+ private String closeTag;
+ private String separator;
+ private long pollTimeout;
+ private long asyncTimeout;
+ private volatile boolean completed;
+ private AtomicBoolean firstWriteDone = new AtomicBoolean();
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+ this(ar, openTag, closeTag, "", 1000);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout) {
+ this(ar, openTag, closeTag, sep, pollTimeout, 0);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout, long asyncTimeout) {
+ super(ar);
+ this.openTag = openTag;
+ this.closeTag = closeTag;
+ this.separator = sep;
+ this.pollTimeout = pollTimeout;
+ this.asyncTimeout = 0;
+ if (asyncTimeout > 0) {
+ ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ ar.setTimeoutHandler(new TimeoutHandlerImpl());
+ }
+ }
+ @Override
+ public void onStart() {
+ if (asyncTimeout == 0) {
+ resumeAsyncResponse();
+ }
+ }
+ private void resumeAsyncResponse() {
+ super.resume(new StreamingResponseImpl());
+ }
+ @Override
+ public void onCompleted() {
+ completed = true;
+ }
+
+ @Override
+ public void onNext(T bean) {
+ if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+ resumeAsyncResponse();
+ }
+ queue.add(bean);
+ }
+ private class StreamingResponseImpl implements StreamingResponse<T> {
+
+ @Override
+ public void writeTo(Writer<T> writer) throws IOException {
+ if (openTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+ }
+ while (!completed || queue.size() > 0) {
+ try {
+ T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+ if (bean != null) {
+ if (firstWriteDone.getAndSet(true)) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+ }
+ writer.write(bean);
+ }
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ if (closeTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+ }
+
+ }
+
+ }
+ public class TimeoutHandlerImpl implements TimeoutHandler {
+
+ @Override
+ public void handleTimeout(AsyncResponse asyncResponse) {
+ if (queue.isEmpty()) {
+ asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ resumeAsyncResponse();
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
new file mode 100644
index 0000000..c6d0086
--- /dev/null
+++ b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.rx;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ObservableWriterTest extends Assert {
+
+
+ @Test
+ public void testIsWriteable() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/rt/rs/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index a374743..0348e4b 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -37,6 +37,7 @@
<module>extensions/json-basic</module>
<module>extensions/providers</module>
<module>extensions/search</module>
+ <module>extensions/rx</module>
<module>security</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/pom.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 17a19b1..fe9bc56 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -328,6 +328,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-rx</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-security-cors</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 45a9468..2f4c496 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -29,7 +29,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.provider.rx.ObservableReader;
+import org.apache.cxf.jaxrs.rx.ObservableReader;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index d12641a..4915b71 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.provider.rx.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.ObservableWriter;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
http://git-wip-us.apache.org/repos/asf/cxf/blob/6d82b75e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index 6081f2b..bac9472 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -29,9 +29,9 @@ import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
-import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.provider.rx.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.provider.rx.ListAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.ListAsyncSubscriber;
import rx.Observable;
import rx.schedulers.Schedulers;