You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2018/02/15 11:34:49 UTC

[GitHub] johnament closed pull request #381: [CXF-7642] Introduce separate modules for RxJava & RxJava2

johnament closed pull request #381: [CXF-7642] Introduce separate modules for RxJava & RxJava2
URL: https://github.com/apache/cxf/pull/381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java
new file mode 100644
index 00000000000..e7dfc8a46d6
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.ext;
+
+import java.util.Collections;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.service.invoker.Invoker;
+
+public abstract class AbstractStreamingResponseExtension implements JAXRSServerFactoryCustomizationExtension {
+    protected abstract Invoker createInvoker(JAXRSServerFactoryBean bean);
+
+    @Override
+    public final void customize(JAXRSServerFactoryBean bean) {
+        bean.setInvoker(createInvoker(bean));
+        StreamingResponseProvider<Object> streamProvider = new StreamingResponseProvider<>();
+        streamProvider.setProduceMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON));
+        bean.setProvider(streamProvider);
+    }
+}
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
new file mode 100644
index 00000000000..a57b57484dd
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.reactor.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactorCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+                .getOrDefault("useStreamingSubscriber", null);
+        ReactorInvoker invoker = new ReactorInvoker();
+        if (useStreamingSubscriber != null) {
+            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+        }
+        return invoker;
+    }
+}
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 00000000000..0bca2eb9a08
--- /dev/null
+++ b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer
\ No newline at end of file
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index 2e3df9ea030..c016384f682 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -45,25 +45,6 @@
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
           <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
-          <groupId>io.reactivex.rxjava2</groupId>
-          <artifactId>rxjava</artifactId>
-          <scope>provided</scope>
-          <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.cxf</groupId>
-            <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java
new file mode 100644
index 00000000000..55bf3b9706d
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ObservableCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        return new ObservableInvoker();
+    }
+}
diff --git a/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 00000000000..bb0e3d186f0
--- /dev/null
+++ b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx.server.ObservableCustomizer
\ No newline at end of file
diff --git a/rt/rs/extensions/rx2/pom.xml b/rt/rs/extensions/rx2/pom.xml
new file mode 100644
index 00000000000..74f97642336
--- /dev/null
+++ b/rt/rs/extensions/rx2/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>cxf-rt-rs-extension-rx2</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache CXF JAX-RS Extensions: RxJava2</name>
+    <description>Apache CXF JAX-RS Extensions: RxJava2</description>
+    <url>http://cxf.apache.org</url>
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>3.2.3-SNAPSHOT</version>
+        <relativePath>../../../../parent/pom.xml</relativePath>
+    </parent>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+        </dependency> 
+        <dependency>
+          <groupId>io.reactivex.rxjava2</groupId>
+          <artifactId>rxjava</artifactId>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
similarity index 100%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
new file mode 100644
index 00000000000..e57bb39a959
--- /dev/null
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx2.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+                .getOrDefault("useStreamingSubscriber", null);
+        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
+        if (useStreamingSubscriber != null) {
+            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+        }
+        return invoker;
+    }
+}
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
similarity index 92%
rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 13092dc0314..e8871f45446 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -41,14 +41,14 @@ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result
     
     protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-        single.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+        single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
         return asyncResponse;
     }
 
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
         final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
         if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
-            f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
+            f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
         }
         return asyncResponse;
     }
diff --git a/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 00000000000..7fd3d78e82d
--- /dev/null
+++ b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer
\ No newline at end of file
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 4398491fb37..debbe21497d 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,6 +38,7 @@
         <module>extensions/providers</module>
         <module>extensions/search</module>
         <module>extensions/rx</module>
+        <module>extensions/rx2</module>
         <module>extensions/reactor</module>
         <module>extensions/reactivestreams</module>
         <module>security</module>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 6106cbcfcde..283964c91b7 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -322,6 +322,11 @@
             <artifactId>cxf-rt-rs-extension-rx</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-rx2</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-extension-reactor</artifactId>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
index 5063b5e928a..2b18c839a7c 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker;
+import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -52,13 +49,9 @@ protected void run() {
     private JAXRSServerFactoryBean createFactoryBean(Bus bus, boolean useStreamingSubscriber,
                                                      String relAddress) {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
-        invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        sf.setInvoker(invoker);
+        sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber);
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ReactiveIOCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJava2FlowableService.class);
         sf.setResourceProvider(RxJava2FlowableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
index 85d576d6791..15759b11b52 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
+import org.apache.cxf.jaxrs.rx.server.ObservableCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -45,11 +42,8 @@ protected void run() {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setInvoker(new ObservableInvoker());
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ObservableCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(RxJavaObservableService.class);
         sf.setResourceProvider(RxJavaObservableService.class,
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index a3ea3deec4d..d66660cdc27 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -77,6 +77,7 @@ public void testTextJsonImplicitListAsyncStream() throws Exception {
                 .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
                 .subscribe();
         Thread.sleep(500);
+        assertNotNull(holder.value);
         assertEquals("Hello", holder.value.getGreeting());
         assertEquals("World", holder.value.getAudience());
     }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index dac4c2c219b..8965073dd0d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
-import java.util.Collections;
-
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.Bus;
@@ -28,8 +26,7 @@
 import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker;
+import org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 public class ReactorServer extends AbstractBusTestServerBase {
@@ -44,13 +41,9 @@ protected void run() {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        ReactorInvoker invoker = new ReactorInvoker();
-        invoker.setUseStreamingSubscriberIfPossible(false);
-        sf.setInvoker(invoker);
+        sf.getProperties(true).put("useStreamingSubscriber", false);
         sf.setProvider(new JacksonJsonProvider());
-        StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf.setProvider(streamProvider);
+        new ReactorCustomizer().customize(sf);
         sf.getOutInterceptors().add(new LoggingOutInterceptor());
         sf.setResourceClasses(FluxService.class, MonoService.class);
         sf.setResourceProvider(FluxService.class,
@@ -61,11 +54,8 @@ protected void run() {
         server1 = sf.create();
         
         JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
-        sf2.setInvoker(new ReactorInvoker());
-        StreamingResponseProvider<HelloWorldBean> streamProvider2 = new StreamingResponseProvider<HelloWorldBean>();
-        streamProvider2.setProduceMediaTypes(Collections.singletonList("application/json"));
-        sf2.setProvider(streamProvider2);
         sf2.setProvider(new JacksonJsonProvider());
+        new ReactorCustomizer().customize(sf2);
         sf2.getOutInterceptors().add(new LoggingOutInterceptor());
         sf2.setResourceClasses(FluxService.class);
         sf2.setResourceProvider(FluxService.class,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services