You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/05 01:10:09 UTC
[02/23] cxf git commit: [CXF-6833] Better support for implicit lists,
simpler async subscription code
[CXF-6833] Better support for implicit lists, simpler async subscription code
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/502db47a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/502db47a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/502db47a
Branch: refs/heads/master-jaxrs-2.1
Commit: 502db47a7c520767da2977376be5cf2fce3f56af
Parents: 8e753ad
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Aug 30 18:02:32 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Aug 30 18:02:32 2016 +0100
----------------------------------------------------------------------
.../provider/rx/AbstractAsyncSubscriber.java | 44 ++++++++++++++++++
.../cxf/jaxrs/provider/rx/ObservableWriter.java | 47 +++++++++++++++++---
.../jaxrs/reactive/JAXRSReactiveTest.java | 36 +++++++++++++--
.../systest/jaxrs/reactive/ReactiveServer.java | 2 +
.../systest/jaxrs/reactive/ReactiveService.java | 44 ++++++++++++------
5 files changed, 149 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..ae4459c
--- /dev/null
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider.rx;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+
+ private AsyncResponse ar;
+
+ protected AbstractAsyncSubscriber(AsyncResponse ar) {
+ this.ar = ar;
+ }
+ public void resume(T response) {
+ ar.resume(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ ar.resume(t);
+ }
+
+ protected AsyncResponse getAsyncResponse() {
+ return ar;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
index 929709b..6317506 100644
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
@@ -22,22 +22,28 @@ import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.Providers;
import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
import rx.Observable;
+@Provider
public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
@Context
private Providers providers;
+ private boolean writeSingleElementAsList;
@Override
public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
@@ -54,24 +60,49 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
MultivaluedMap<String, Object> headers, OutputStream os)
throws IOException, WebApplicationException {
- obs.subscribe(value -> writeToOutputStream(value, anns, mt, headers, os),
- throwable -> throwError(throwable));
+ List<T> entities = new LinkedList<T>();
+ obs.subscribe(value -> entities.add(value),
+ throwable -> throwError(throwable));
+ if (!entities.isEmpty()) {
+
+ if (entities.get(0) instanceof List) {
+ List<T> allEntities = new LinkedList<T>();
+ for (T obj : entities) {
+ @SuppressWarnings("unchecked")
+ List<T> listT = (List<T>)obj;
+ allEntities.addAll(listT);
+ }
+ writeToOutputStream(allEntities, anns, mt, headers, os);
+ } else if (entities.size() > 1 || writeSingleElementAsList) {
+ writeToOutputStream(entities, anns, mt, headers, os);
+ } else {
+ writeToOutputStream(entities.get(0), anns, mt, headers, os);
+ }
+ }
}
- private void writeToOutputStream(T value,
+ private void writeToOutputStream(Object value,
Annotation[] anns,
MediaType mt,
MultivaluedMap<String, Object> headers,
OutputStream os) {
+ Class<?> valueCls = value.getClass();
+ Type valueType = null;
+ if (value instanceof List) {
+ List<?> list = (List<?>)value;
+ valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+ } else {
+ valueType = valueCls;
+ }
@SuppressWarnings("unchecked")
- MessageBodyWriter<T> writer =
- (MessageBodyWriter<T>)providers.getMessageBodyWriter(value.getClass(), value.getClass(), anns, mt);
+ MessageBodyWriter<Object> writer =
+ (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
if (writer == null) {
throwError(null);
}
try {
- writer.writeTo(value, value.getClass(), value.getClass(), anns, mt, headers, os);
+ writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
} catch (IOException ex) {
throwError(ex);
}
@@ -81,4 +112,8 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
throw ExceptionUtils.toInternalServerErrorException(cause, null);
}
+ public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+ this.writeSingleElementAsList = writeSingleElementAsList;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 535831d..f82d139 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -20,10 +20,13 @@
package org.apache.cxf.systest.jaxrs.reactive;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Future;
import javax.ws.rs.core.GenericType;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
import org.apache.cxf.jaxrs.provider.rx.ObservableReader;
@@ -85,10 +88,35 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/reactive/textJson";
- WebClient wc = WebClient.create(address);
- String text = wc.accept("application/json").get(String.class);
- assertTrue("{\"audience\":\"World\",\"greeting\":\"Hello\"}".equals(text)
- || "{\"greeting\":\"Hello\",\"audience\":\"World\"}".equals(text));
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+ @Test
+ public void testGetHelloWorldJsonList() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactive/textJsonList";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitList() throws Exception {
+ String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitList";
+ doTestGetHelloWorldJsonList(address);
+ }
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new JacksonJsonProvider()));
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
}
private Observable<String> getObservable(Future<String> future) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index 09cbecc..ced133b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
import org.apache.cxf.jaxrs.provider.rx.ObservableWriter;
@@ -42,6 +43,7 @@ public class ReactiveServer extends AbstractBusTestServerBase {
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new ObservableWriter<Object>());
sf.setProvider(new JacksonJsonProvider());
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
sf.setResourceClasses(ReactiveService.class);
sf.setResourceProvider(ReactiveService.class,
new SingletonResourceProvider(new ReactiveService(), true));
http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index f716bbc..5d77969 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -20,14 +20,18 @@
package org.apache.cxf.systest.jaxrs.reactive;
+import java.util.Arrays;
+import java.util.List;
+
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
+import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber;
+
import rx.Observable;
-import rx.Subscriber;
@Path("/reactive")
@@ -45,33 +49,45 @@ public class ReactiveService {
@Path("textAsync")
public void getTextAsync(@Suspended final AsyncResponse ar) {
Observable.just("Hello, ").map(s -> s + "world!")
- .subscribe(new AsyncResponseSubscriber(ar));
+ .subscribe(new StringAsyncSubscriber(ar));
}
@GET
@Produces("application/json")
@Path("textJson")
- public Observable<HelloWorldBean> getJsonText() {
+ public Observable<HelloWorldBean> getJson() {
return Observable.just(new HelloWorldBean());
}
- private class AsyncResponseSubscriber extends Subscriber<String> {
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitList")
+ public Observable<HelloWorldBean> getJsonImplicitList() {
+ HelloWorldBean bean1 = new HelloWorldBean();
+ HelloWorldBean bean2 = new HelloWorldBean();
+ bean2.setGreeting("Ciao");
+ return Observable.just(bean1, bean2);
+ }
+ @GET
+ @Produces("application/json")
+ @Path("textJsonList")
+ public Observable<List<HelloWorldBean>> getJsonList() {
+ HelloWorldBean bean1 = new HelloWorldBean();
+ HelloWorldBean bean2 = new HelloWorldBean();
+ bean2.setGreeting("Ciao");
+ return Observable.just(Arrays.asList(bean1, bean2));
+ }
+
+ private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
private StringBuilder sb = new StringBuilder();
- private AsyncResponse ar;
-
- AsyncResponseSubscriber(AsyncResponse ar) {
- this.ar = ar;
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
}
@Override
public void onCompleted() {
- ar.resume(sb.toString());
- }
-
- @Override
- public void onError(Throwable arg0) {
- // TODO Auto-generated method stub
+ super.resume(sb.toString());
}
@Override