You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/09/01 10:01:08 UTC

cxf git commit: Having a single invoker only for RxJava2

Repository: cxf
Updated Branches:
  refs/heads/master 0f3e34689 -> 18dd0e1c7


Having a single invoker only for RxJava2


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/18dd0e1c
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/18dd0e1c
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/18dd0e1c

Branch: refs/heads/master
Commit: 18dd0e1c754fd30c9ab479cf123db5ab8f6810e7
Parents: 0f3e346
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Fri Sep 1 11:00:53 2017 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Fri Sep 1 11:00:53 2017 +0100

----------------------------------------------------------------------
 .../cxf/jaxrs/rx2/server/FlowableInvoker.java   | 43 ---------------
 .../cxf/jaxrs/rx2/server/ObservableInvoker.java | 43 ---------------
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 57 ++++++++++++++++++++
 .../jaxrs/reactive/RxJava2FlowableServer.java   |  4 +-
 .../jaxrs/reactive/RxJava2ObservableServer.java |  4 +-
 5 files changed, 61 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
deleted file mode 100644
index 1ff7491..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx2.server;
-
-import org.apache.cxf.jaxrs.JAXRSInvoker;
-import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
-import org.apache.cxf.message.Message;
-
-import io.reactivex.Flowable;
-
-public class FlowableInvoker extends JAXRSInvoker {
-    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
-        if (result instanceof Flowable) {
-            final Flowable<?> f = (Flowable<?>)result;
-            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-            f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
-            return asyncResponse;
-        }
-        return null;
-    }
-
-    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
-        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
-        asyncResponse.resume(t);
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
deleted file mode 100644
index 8047c6a..0000000
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx2.server;
-
-import org.apache.cxf.jaxrs.JAXRSInvoker;
-import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
-import org.apache.cxf.message.Message;
-
-import io.reactivex.Observable;
-
-public class ObservableInvoker extends JAXRSInvoker {
-    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
-        if (result instanceof Observable) {
-            final Observable<?> obs = (Observable<?>)result;
-            final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-            obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
-            return asyncResponse;
-        }
-        return null;
-    }
-
-    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
-        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
-        asyncResponse.resume(t);
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
new file mode 100644
index 0000000..c529d4a
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+
+//Work in Progress
+public class ReactiveIOInvoker extends JAXRSInvoker {
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) {
+        if (result instanceof Flowable) {
+            return handleFlowable(inMessage, (Flowable<?>)result);
+        } else if (result instanceof Observable) {
+            return handleObservable(inMessage, (Observable<?>)result);
+        } else {
+            return null;
+        }
+    }
+    
+    protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        return asyncResponse;
+    }
+    
+    protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
+        final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
+        obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        return asyncResponse;
+    }
+
+    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) {
+        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
+        asyncResponse.resume(t);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index fe41958..8558bed 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx2.server.FlowableInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -45,7 +45,7 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new FlowableInvoker());
+        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
         StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
         streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));

http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
index 48df030..a8849d1 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java
@@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory;
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -42,7 +42,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ObservableInvoker());
+        sf.setInvoker(new ReactiveIOInvoker());
         sf.setProvider(new JacksonJsonProvider());
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2ObservableService.class);