You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/02 15:27:57 UTC
cxf git commit: [CXF-6833] Package updates
Repository: cxf
Updated Branches:
refs/heads/master 6d82b75eb -> 6fcdc7e9e
[CXF-6833] Package updates
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6fcdc7e9
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6fcdc7e9
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6fcdc7e9
Branch: refs/heads/master
Commit: 6fcdc7e9eec2414a40bd3161cf237ef182e313a6
Parents: 6d82b75
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Fri Sep 2 16:27:40 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Fri Sep 2 16:27:40 2016 +0100
----------------------------------------------------------------------
.../cxf/jaxrs/rx/AbstractAsyncSubscriber.java | 56 ---------
.../jaxrs/rx/JsonStreamingAsyncSubscriber.java | 34 -----
.../cxf/jaxrs/rx/ListAsyncSubscriber.java | 42 -------
.../apache/cxf/jaxrs/rx/ObservableReader.java | 61 ---------
.../apache/cxf/jaxrs/rx/ObservableWriter.java | 119 ------------------
.../cxf/jaxrs/rx/StreamingAsyncSubscriber.java | 124 -------------------
.../cxf/jaxrs/rx/provider/ObservableReader.java | 61 +++++++++
.../cxf/jaxrs/rx/provider/ObservableWriter.java | 119 ++++++++++++++++++
.../rx/server/AbstractAsyncSubscriber.java | 56 +++++++++
.../rx/server/JsonStreamingAsyncSubscriber.java | 34 +++++
.../jaxrs/rx/server/ListAsyncSubscriber.java | 42 +++++++
.../rx/server/StreamingAsyncSubscriber.java | 124 +++++++++++++++++++
.../cxf/jaxrs/rx/ObservableWriterTest.java | 32 -----
.../jaxrs/rx/provider/ObservableWriterTest.java | 32 +++++
.../jaxrs/reactive/JAXRSReactiveTest.java | 2 +-
.../systest/jaxrs/reactive/ReactiveServer.java | 2 +-
.../systest/jaxrs/reactive/ReactiveService.java | 6 +-
17 files changed, 473 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
deleted file mode 100644
index 80b1592..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-import rx.Subscriber;
-
-public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
-
- private AsyncResponse ar;
-
- protected AbstractAsyncSubscriber(AsyncResponse ar) {
- this.ar = ar;
- }
- public void resume(T response) {
- ar.resume(response);
- }
-
- public void resume(List<T> response) {
- ar.resume(response);
- }
-
- public void resume(StreamingResponse<T> response) {
- ar.resume(response);
- }
-
- @Override
- public void onError(Throwable t) {
- ar.resume(t);
- }
-
- protected AsyncResponse getAsyncResponse() {
- return ar;
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
deleted file mode 100644
index b5c22a4..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
- public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
- this(ar, 1000);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
- this(ar, pollTimeout, 0);
- }
- public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
- super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
deleted file mode 100644
index e94a861..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private List<T> beans = new LinkedList<T>();
- public ListAsyncSubscriber(AsyncResponse ar) {
- super(ar);
- }
- @Override
- public void onCompleted() {
- super.resume(beans);
- }
-
- @Override
- public void onNext(T bean) {
- beans.add(bean);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
deleted file mode 100644
index 0e0780a..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-
- @Context
- private Providers providers;
-
- @Override
- public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, String> headers, InputStream is)
- throws IOException, WebApplicationException {
- @SuppressWarnings("unchecked")
- Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
- final MessageBodyReader<T> mbr =
- (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
- if (mbr == null) {
- throw new ProcessingException("MBR is null");
- }
- return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
deleted file mode 100644
index 475d36b..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-
- @Context
- private Providers providers;
- private boolean writeSingleElementAsList;
-
- @Override
- public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
- // TODO Auto-generated method stub
- return -1;
- }
-
- @Override
- public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, Object> headers, OutputStream os)
- throws IOException, WebApplicationException {
- List<T> entities = new LinkedList<T>();
- obs.subscribe(value -> entities.add(value),
- throwable -> throwError(throwable));
- if (!entities.isEmpty()) {
-
- if (entities.get(0) instanceof List) {
- List<T> allEntities = new LinkedList<T>();
- for (T obj : entities) {
- @SuppressWarnings("unchecked")
- List<T> listT = (List<T>)obj;
- allEntities.addAll(listT);
- }
- writeToOutputStream(allEntities, anns, mt, headers, os);
- } else if (entities.size() > 1 || writeSingleElementAsList) {
- writeToOutputStream(entities, anns, mt, headers, os);
- } else {
- writeToOutputStream(entities.get(0), anns, mt, headers, os);
- }
- }
- }
-
- private void writeToOutputStream(Object value,
- Annotation[] anns,
- MediaType mt,
- MultivaluedMap<String, Object> headers,
- OutputStream os) {
- Class<?> valueCls = value.getClass();
- Type valueType = null;
- if (value instanceof List) {
- List<?> list = (List<?>)value;
- valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
- } else {
- valueType = valueCls;
- }
- @SuppressWarnings("unchecked")
- MessageBodyWriter<Object> writer =
- (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
- if (writer == null) {
- throwError(null);
- }
-
- try {
- writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
- } catch (IOException ex) {
- throwError(ex);
- }
- }
-
- private static void throwError(Throwable cause) {
- throw ExceptionUtils.toInternalServerErrorException(cause, null);
- }
-
- public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
- this.writeSingleElementAsList = writeSingleElementAsList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
deleted file mode 100644
index c531e98..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.ext.StreamingResponse;
-
-public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
-
- private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
- private String openTag;
- private String closeTag;
- private String separator;
- private long pollTimeout;
- private long asyncTimeout;
- private volatile boolean completed;
- private AtomicBoolean firstWriteDone = new AtomicBoolean();
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
- this(ar, openTag, closeTag, "", 1000);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout) {
- this(ar, openTag, closeTag, sep, pollTimeout, 0);
- }
- public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
- long pollTimeout, long asyncTimeout) {
- super(ar);
- this.openTag = openTag;
- this.closeTag = closeTag;
- this.separator = sep;
- this.pollTimeout = pollTimeout;
- this.asyncTimeout = 0;
- if (asyncTimeout > 0) {
- ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- ar.setTimeoutHandler(new TimeoutHandlerImpl());
- }
- }
- @Override
- public void onStart() {
- if (asyncTimeout == 0) {
- resumeAsyncResponse();
- }
- }
- private void resumeAsyncResponse() {
- super.resume(new StreamingResponseImpl());
- }
- @Override
- public void onCompleted() {
- completed = true;
- }
-
- @Override
- public void onNext(T bean) {
- if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
- resumeAsyncResponse();
- }
- queue.add(bean);
- }
- private class StreamingResponseImpl implements StreamingResponse<T> {
-
- @Override
- public void writeTo(Writer<T> writer) throws IOException {
- if (openTag != null) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
- }
- while (!completed || queue.size() > 0) {
- try {
- T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
- if (bean != null) {
- if (firstWriteDone.getAndSet(true)) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
- }
- writer.write(bean);
- }
- } catch (InterruptedException ex) {
- // ignore
- }
- }
- if (closeTag != null) {
- writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
- }
-
- }
-
- }
- public class TimeoutHandlerImpl implements TimeoutHandler {
-
- @Override
- public void handleTimeout(AsyncResponse asyncResponse) {
- if (queue.isEmpty()) {
- asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
- } else {
- resumeAsyncResponse();
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
new file mode 100644
index 0000000..f05f478
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.InjectionUtils;
+
+import rx.Observable;
+
+public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
+
+ @Context
+ private Providers providers;
+
+ @Override
+ public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, String> headers, InputStream is)
+ throws IOException, WebApplicationException {
+ @SuppressWarnings("unchecked")
+ Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
+ final MessageBodyReader<T> mbr =
+ (MessageBodyReader<T>)providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
+ if (mbr == null) {
+ throw new ProcessingException("MBR is null");
+ }
+ return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
new file mode 100644
index 0000000..b4ed2af
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.provider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.Providers;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
+
+import rx.Observable;
+
+@Provider
+public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
+
+ @Context
+ private Providers providers;
+ private boolean writeSingleElementAsList;
+
+ @Override
+ public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
+ // TODO Auto-generated method stub
+ return -1;
+ }
+
+ @Override
+ public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, Object> headers, OutputStream os)
+ throws IOException, WebApplicationException {
+ List<T> entities = new LinkedList<T>();
+ obs.subscribe(value -> entities.add(value),
+ throwable -> throwError(throwable));
+ if (!entities.isEmpty()) {
+
+ if (entities.get(0) instanceof List) {
+ List<T> allEntities = new LinkedList<T>();
+ for (T obj : entities) {
+ @SuppressWarnings("unchecked")
+ List<T> listT = (List<T>)obj;
+ allEntities.addAll(listT);
+ }
+ writeToOutputStream(allEntities, anns, mt, headers, os);
+ } else if (entities.size() > 1 || writeSingleElementAsList) {
+ writeToOutputStream(entities, anns, mt, headers, os);
+ } else {
+ writeToOutputStream(entities.get(0), anns, mt, headers, os);
+ }
+ }
+ }
+
+ private void writeToOutputStream(Object value,
+ Annotation[] anns,
+ MediaType mt,
+ MultivaluedMap<String, Object> headers,
+ OutputStream os) {
+ Class<?> valueCls = value.getClass();
+ Type valueType = null;
+ if (value instanceof List) {
+ List<?> list = (List<?>)value;
+ valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+ } else {
+ valueType = valueCls;
+ }
+ @SuppressWarnings("unchecked")
+ MessageBodyWriter<Object> writer =
+ (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
+ if (writer == null) {
+ throwError(null);
+ }
+
+ try {
+ writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
+ } catch (IOException ex) {
+ throwError(ex);
+ }
+ }
+
+ private static void throwError(Throwable cause) {
+ throw ExceptionUtils.toInternalServerErrorException(cause, null);
+ }
+
+ public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+ this.writeSingleElementAsList = writeSingleElementAsList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..94a85fc
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+
+ private AsyncResponse ar;
+
+ protected AbstractAsyncSubscriber(AsyncResponse ar) {
+ this.ar = ar;
+ }
+ public void resume(T response) {
+ ar.resume(response);
+ }
+
+ public void resume(List<T> response) {
+ ar.resume(response);
+ }
+
+ public void resume(StreamingResponse<T> response) {
+ ar.resume(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ ar.resume(t);
+ }
+
+ protected AsyncResponse getAsyncResponse() {
+ return ar;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
new file mode 100644
index 0000000..c71c8c1
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> {
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar) {
+ this(ar, 1000);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) {
+ this(ar, pollTimeout, 0);
+ }
+ public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) {
+ super(ar, "[", "]", ",", pollTimeout, asyncTimeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
new file mode 100644
index 0000000..5be73e5
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private List<T> beans = new LinkedList<T>();
+ public ListAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onCompleted() {
+ super.resume(beans);
+ }
+
+ @Override
+ public void onNext(T bean) {
+ beans.add(bean);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
new file mode 100644
index 0000000..bd16292
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+
+public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> {
+
+ private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+ private String openTag;
+ private String closeTag;
+ private String separator;
+ private long pollTimeout;
+ private long asyncTimeout;
+ private volatile boolean completed;
+ private AtomicBoolean firstWriteDone = new AtomicBoolean();
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
+ this(ar, openTag, closeTag, "", 1000);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout) {
+ this(ar, openTag, closeTag, sep, pollTimeout, 0);
+ }
+ public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep,
+ long pollTimeout, long asyncTimeout) {
+ super(ar);
+ this.openTag = openTag;
+ this.closeTag = closeTag;
+ this.separator = sep;
+ this.pollTimeout = pollTimeout;
+ this.asyncTimeout = 0;
+ if (asyncTimeout > 0) {
+ ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ ar.setTimeoutHandler(new TimeoutHandlerImpl());
+ }
+ }
+ @Override
+ public void onStart() {
+ if (asyncTimeout == 0) {
+ resumeAsyncResponse();
+ }
+ }
+ private void resumeAsyncResponse() {
+ super.resume(new StreamingResponseImpl());
+ }
+ @Override
+ public void onCompleted() {
+ completed = true;
+ }
+
+ @Override
+ public void onNext(T bean) {
+ if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
+ resumeAsyncResponse();
+ }
+ queue.add(bean);
+ }
+ private class StreamingResponseImpl implements StreamingResponse<T> {
+
+ @Override
+ public void writeTo(Writer<T> writer) throws IOException {
+ if (openTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag));
+ }
+ while (!completed || queue.size() > 0) {
+ try {
+ T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+ if (bean != null) {
+ if (firstWriteDone.getAndSet(true)) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(separator));
+ }
+ writer.write(bean);
+ }
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ if (closeTag != null) {
+ writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag));
+ }
+
+ }
+
+ }
+ public class TimeoutHandlerImpl implements TimeoutHandler {
+
+ @Override
+ public void handleTimeout(AsyncResponse asyncResponse) {
+ if (queue.isEmpty()) {
+ asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ resumeAsyncResponse();
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
deleted file mode 100644
index c6d0086..0000000
--- a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.rx;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ObservableWriterTest extends Assert {
-
-
- @Test
- public void testIsWriteable() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
new file mode 100644
index 0000000..049377b
--- /dev/null
+++ b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.rx.provider;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ObservableWriterTest extends Assert {
+
+
+ @Test
+ public void testIsWriteable() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 2f4c496..59da436 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -29,7 +29,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
-import org.apache.cxf.jaxrs.rx.ObservableReader;
+import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index 4915b71..afd5cbb 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.provider.ObservableWriter;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index bac9472..9c9ae08 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -29,9 +29,9 @@ import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
-import org.apache.cxf.jaxrs.rx.AbstractAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.JsonStreamingAsyncSubscriber;
-import org.apache.cxf.jaxrs.rx.ListAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;
+import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
import rx.Observable;
import rx.schedulers.Schedulers;