You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/05 01:10:09 UTC

[02/23] cxf git commit: [CXF-6833] Better support for implicit lists, simpler async subscription code

[CXF-6833] Better support for implicit lists, simpler async subscription code


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/502db47a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/502db47a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/502db47a

Branch: refs/heads/master-jaxrs-2.1
Commit: 502db47a7c520767da2977376be5cf2fce3f56af
Parents: 8e753ad
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Aug 30 18:02:32 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Aug 30 18:02:32 2016 +0100

----------------------------------------------------------------------
 .../provider/rx/AbstractAsyncSubscriber.java    | 44 ++++++++++++++++++
 .../cxf/jaxrs/provider/rx/ObservableWriter.java | 47 +++++++++++++++++---
 .../jaxrs/reactive/JAXRSReactiveTest.java       | 36 +++++++++++++--
 .../systest/jaxrs/reactive/ReactiveServer.java  |  2 +
 .../systest/jaxrs/reactive/ReactiveService.java | 44 ++++++++++++------
 5 files changed, 149 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
new file mode 100644
index 0000000..ae4459c
--- /dev/null
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.provider.rx;
+
+import javax.ws.rs.container.AsyncResponse;
+
+import rx.Subscriber;
+
+public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> {
+    
+    private AsyncResponse ar;
+    
+    protected AbstractAsyncSubscriber(AsyncResponse ar) {
+        this.ar = ar;
+    }
+    public void resume(T response) {
+        ar.resume(response);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        ar.resume(t);
+    }
+    
+    protected AsyncResponse getAsyncResponse() {
+        return ar;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
index 929709b..6317506 100644
--- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
+++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java
@@ -22,22 +22,28 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
+import java.util.LinkedList;
+import java.util.List;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
 import javax.ws.rs.ext.Providers;
 
 import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
 
 import rx.Observable;
 
+@Provider
 public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
     
     @Context
     private Providers providers;
+    private boolean writeSingleElementAsList;
     
     @Override
     public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
@@ -54,24 +60,49 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
     public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
                         MultivaluedMap<String, Object> headers, OutputStream os)
                             throws IOException, WebApplicationException {
-        obs.subscribe(value -> writeToOutputStream(value, anns, mt, headers, os),
-            throwable -> throwError(throwable));   
+        List<T> entities = new LinkedList<T>();
+        obs.subscribe(value -> entities.add(value),
+            throwable -> throwError(throwable));
+        if (!entities.isEmpty()) {
+            
+            if (entities.get(0) instanceof List) {
+                List<T> allEntities = new LinkedList<T>();
+                for (T obj : entities) {
+                    @SuppressWarnings("unchecked")
+                    List<T> listT = (List<T>)obj;
+                    allEntities.addAll(listT);
+                }
+                writeToOutputStream(allEntities, anns, mt, headers, os);
+            } else if (entities.size() > 1 || writeSingleElementAsList) {
+                writeToOutputStream(entities, anns, mt, headers, os);
+            } else {
+                writeToOutputStream(entities.get(0), anns, mt, headers, os);
+            }
+        }
     }
 
-    private void writeToOutputStream(T value,
+    private void writeToOutputStream(Object value,
                                      Annotation[] anns,
                                      MediaType mt,
                                      MultivaluedMap<String, Object> headers, 
                                      OutputStream os) {
+        Class<?> valueCls = value.getClass();
+        Type valueType = null;
+        if (value instanceof List) {
+            List<?> list = (List<?>)value;
+            valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
+        } else {
+            valueType = valueCls;
+        }
         @SuppressWarnings("unchecked")
-        MessageBodyWriter<T> writer = 
-            (MessageBodyWriter<T>)providers.getMessageBodyWriter(value.getClass(), value.getClass(), anns, mt);
+        MessageBodyWriter<Object> writer = 
+            (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
         if (writer == null) {
             throwError(null);
         }
     
         try {
-            writer.writeTo(value, value.getClass(), value.getClass(), anns, mt, headers, os);    
+            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);    
         } catch (IOException ex) {
             throwError(ex);
         }
@@ -81,4 +112,8 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
         throw ExceptionUtils.toInternalServerErrorException(cause, null);
     }
 
+    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
+        this.writeSingleElementAsList = writeSingleElementAsList;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 535831d..f82d139 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -20,10 +20,13 @@
 package org.apache.cxf.systest.jaxrs.reactive;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Future;
 
 import javax.ws.rs.core.GenericType;
 
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.jaxrs.provider.rx.ObservableReader;
@@ -85,10 +88,35 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/reactive/textJson";
-        WebClient wc = WebClient.create(address);
-        String text = wc.accept("application/json").get(String.class);
-        assertTrue("{\"audience\":\"World\",\"greeting\":\"Hello\"}".equals(text)
-                   || "{\"greeting\":\"Hello\",\"audience\":\"World\"}".equals(text));
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactive/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitList() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitList";
+        doTestGetHelloWorldJsonList(address);
+    }
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new JacksonJsonProvider()));
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() {        
+        };
+        
+        List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
     }
     
     private Observable<String> getObservable(Future<String> future) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
index 09cbecc..ced133b 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
+import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 import org.apache.cxf.jaxrs.provider.rx.ObservableWriter;
@@ -42,6 +43,7 @@ public class ReactiveServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.setProvider(new ObservableWriter<Object>());
         sf.setProvider(new JacksonJsonProvider());
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(ReactiveService.class);
         sf.setResourceProvider(ReactiveService.class,
                                new SingletonResourceProvider(new ReactiveService(), true));

http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
index f716bbc..5d77969 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java
@@ -20,14 +20,18 @@
 package org.apache.cxf.systest.jaxrs.reactive;
 
 
+import java.util.Arrays;
+import java.util.List;
+
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 
+import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber;
+
 import rx.Observable;
-import rx.Subscriber;
 
 
 @Path("/reactive")
@@ -45,33 +49,45 @@ public class ReactiveService {
     @Path("textAsync")
     public void getTextAsync(@Suspended final AsyncResponse ar) {
         Observable.just("Hello, ").map(s -> s + "world!")
-            .subscribe(new AsyncResponseSubscriber(ar));
+            .subscribe(new StringAsyncSubscriber(ar));
         
     }
     
     @GET
     @Produces("application/json")
     @Path("textJson")
-    public Observable<HelloWorldBean> getJsonText() {
+    public Observable<HelloWorldBean> getJson() {
         return Observable.just(new HelloWorldBean());
     }
     
-    private class AsyncResponseSubscriber extends Subscriber<String> {
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitList")
+    public Observable<HelloWorldBean> getJsonImplicitList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(bean1, bean2);
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonList")
+    public Observable<List<HelloWorldBean>> getJsonList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(Arrays.asList(bean1, bean2));
+    }
+    
+    private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> {
         
         private StringBuilder sb = new StringBuilder();
-        private AsyncResponse ar;
-        
-        AsyncResponseSubscriber(AsyncResponse ar) {
-            this.ar = ar;
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
         }
         @Override
         public void onCompleted() {
-            ar.resume(sb.toString());
-        }
-
-        @Override
-        public void onError(Throwable arg0) {
-            // TODO Auto-generated method stub
+            super.resume(sb.toString());
         }
 
         @Override