You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/08/08 12:21:53 UTC
cxf git commit: [CXF-6833] Removing Observable providers which do not
work asynchronously
Repository: cxf
Updated Branches:
refs/heads/master 925140208 -> 35639ab59
[CXF-6833] Removing Observable providers which do not work asynchronously
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/35639ab5
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/35639ab5
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/35639ab5
Branch: refs/heads/master
Commit: 35639ab59aabf4cace331f49505b699f164df0b1
Parents: 9251402
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Aug 8 13:21:34 2017 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Aug 8 13:21:34 2017 +0100
----------------------------------------------------------------------
.../cxf/jaxrs/rx/provider/ObservableReader.java | 61 ----------
.../cxf/jaxrs/rx/provider/ObservableWriter.java | 119 -------------------
.../cxf/jaxrs/rx/server/ObservableInvoker.java | 43 +++++++
.../jaxrs/rx/provider/ObservableWriterTest.java | 32 -----
.../jaxrs/reactive/JAXRSObservableTest.java | 21 ----
.../jaxrs/reactive/ObservableServer.java | 4 +-
.../jaxrs/reactive/ObservableService.java | 8 --
7 files changed, 45 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
deleted file mode 100644
index c4423ae..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-
- @Context
- private Providers providers;
-
- @Override
- public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public Observable<T> readFrom(Class<Observable<T>> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, String> headers, InputStream is)
- throws IOException, WebApplicationException {
- @SuppressWarnings("unchecked")
- Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
- final MessageBodyReader<T> mbr =
- providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
- if (mbr == null) {
- throw new ProcessingException("MBR is null");
- }
- return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is));
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
deleted file mode 100644
index 33d3864..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.provider;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-
- @Context
- private Providers providers;
- private boolean writeSingleElementAsList;
-
- @Override
- public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
- // TODO Auto-generated method stub
- return -1;
- }
-
- @Override
- public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
- return true;
- }
-
- @Override
- public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, Object> headers, OutputStream os)
- throws IOException, WebApplicationException {
- List<T> entities = new LinkedList<T>();
- obs.subscribe(value -> entities.add(value),
- throwable -> throwError(throwable));
- if (!entities.isEmpty()) {
-
- if (entities.get(0) instanceof List) {
- List<T> allEntities = new LinkedList<T>();
- for (T obj : entities) {
- @SuppressWarnings("unchecked")
- List<T> listT = (List<T>)obj;
- allEntities.addAll(listT);
- }
- writeToOutputStream(allEntities, anns, mt, headers, os);
- } else if (entities.size() > 1 || writeSingleElementAsList) {
- writeToOutputStream(entities, anns, mt, headers, os);
- } else {
- writeToOutputStream(entities.get(0), anns, mt, headers, os);
- }
- }
- }
-
- private void writeToOutputStream(Object value,
- Annotation[] anns,
- MediaType mt,
- MultivaluedMap<String, Object> headers,
- OutputStream os) {
- Class<?> valueCls = value.getClass();
- Type valueType = null;
- if (value instanceof List) {
- List<?> list = (List<?>)value;
- valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass());
- } else {
- valueType = valueCls;
- }
- @SuppressWarnings("unchecked")
- MessageBodyWriter<Object> writer =
- (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt);
- if (writer == null) {
- throwError(null);
- }
-
- try {
- writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
- } catch (IOException ex) {
- throwError(ex);
- }
- }
-
- private static void throwError(Throwable cause) {
- throw ExceptionUtils.toInternalServerErrorException(cause, null);
- }
-
- public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
- this.writeSingleElementAsList = writeSingleElementAsList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
new file mode 100644
index 0000000..59d0ca3
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import rx.Observable;
+
+public class ObservableInvoker extends JAXRSInvoker {
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+ if (result instanceof Observable) {
+ final Observable<?> obs = (Observable<?>)result;
+ final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+ obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+ return asyncResponse;
+ }
+ return null;
+ }
+
+ private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+ //TODO: if it is a Cancelation exception => asyncResponse.cancel();
+ asyncResponse.resume(t);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
deleted file mode 100644
index 045dcec..0000000
--- a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.rx.provider;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ObservableWriterTest extends Assert {
-
-
- @Test
- public void testIsWriteable() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
index a0d8af8..39d8fd5 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
@@ -34,7 +34,6 @@ import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.junit.BeforeClass;
@@ -67,21 +66,6 @@ public class JAXRSObservableTest extends AbstractBusClientServerTestBase {
}
@Test
- public void testGetHelloWorldTextObservableSync() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/text";
- WebClient wc = WebClient.create(address, Collections.singletonList(
- new ObservableReader<Object>()));
- GenericType<Observable<String>> genericResponseType =
- new GenericType<Observable<String>>() {
- };
- Observable<String> obs = wc.accept("text/plain").get(genericResponseType);
- obs.subscribe(s -> assertResponse(s));
- }
-
- private void assertResponse(String s) {
- assertEquals("Hello, world!", s);
- }
- @Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/observable/textJson";
WebClient wc = WebClient.create(address,
@@ -96,11 +80,6 @@ public class JAXRSObservableTest extends AbstractBusClientServerTestBase {
doTestGetHelloWorldJsonList(address);
}
@Test
- public void testGetHelloWorldJsonImplicitList() throws Exception {
- String address = "http://localhost:" + PORT + "/observable/textJsonImplicitList";
- doTestGetHelloWorldJsonList(address);
- }
- @Test
public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync";
doTestGetHelloWorldJsonList(address);
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
index 825dc88..03f89ef 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.provider.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
@@ -45,7 +45,7 @@ public class ObservableServer extends AbstractBusTestServerBase {
// Make sure default JSONProvider is not loaded
bus.setProperty("skip.default.json.provider.registration", true);
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setProvider(new ObservableWriter<Object>());
+ sf.setInvoker(new ObservableInvoker());
sf.setProvider(new JacksonJsonProvider());
StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
index 4aebd1b..00783fd 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
@@ -65,14 +65,6 @@ public class ObservableService {
@GET
@Produces("application/json")
- @Path("textJsonImplicitList")
- public Observable<HelloWorldBean> getJsonImplicitList() {
- HelloWorldBean bean1 = new HelloWorldBean();
- HelloWorldBean bean2 = new HelloWorldBean("Ciao");
- return Observable.just(bean1, bean2);
- }
- @GET
- @Produces("application/json")
@Path("textJsonImplicitListAsync")
public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
final HelloWorldBean bean1 = new HelloWorldBean();