You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/05/19 16:39:02 UTC
[cxf] branch master updated: Reimplementing SSE using AsyncContext
(Servlet 3.0+) (#415)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa7a09 Reimplementing SSE using AsyncContext (Servlet 3.0+) (#415)
7aa7a09 is described below
commit 7aa7a09188ae74aee5075d8723c8ff5720bc45a0
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat May 19 12:38:54 2018 -0400
Reimplementing SSE using AsyncContext (Servlet 3.0+) (#415)
* SSE implementation using AsyncContext instead of atmosphere
* SSE implementation using AsyncContext: refactorings
---
.../main/release/samples/jax_rs/sse_cdi/pom.xml | 5 -
.../src/main/java/demo/jaxrs/sse/StatsServer.java | 2 -
.../main/release/samples/jax_rs/sse_client/pom.xml | 5 -
.../src/main/java/demo/jaxrs/sse/StatsServer.java | 2 -
.../main/release/samples/jax_rs/sse_osgi/pom.xml | 6 -
.../main/resources/OSGI-INF/blueprint/context.xml | 7 +-
.../src/main/java/demo/jaxrs/sse/StatsConfig.java | 2 -
.../src/main/java/demo/jaxrs/sse/StatsServer.java | 2 -
.../main/release/samples/jax_rs/sse_tomcat/pom.xml | 5 -
.../src/main/java/demo/jaxrs/sse/StatsServer.java | 2 -
.../karaf/features/src/main/resources/features.xml | 1 -
.../apache/cxf/jaxrs/impl/AsyncResponseImpl.java | 2 +-
rt/rs/sse/pom.xml | 4 -
.../cxf/jaxrs/sse/OutboundSseEventBodyWriter.java | 2 +
.../apache/cxf/jaxrs/sse/SseBroadcasterImpl.java | 62 ++++--
...vider.java => SseEventSinkContextProvider.java} | 31 +--
.../org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java | 161 ++++++++++++++++
.../java/org/apache/cxf/jaxrs/sse/SseFeature.java | 3 +-
.../sse/atmosphere/SseAtmosphereEventSinkImpl.java | 144 --------------
.../sse/atmosphere/SseAtmosphereInterceptor.java | 181 ------------------
.../atmosphere/SseAtmosphereInterceptorWriter.java | 31 ---
.../ext/SseTransportCustomizationExtension.java | 13 +-
.../cxf/transport/sse/SseDestinationFactory.java | 66 -------
.../cxf/transport/sse/SseHttpTransportFactory.java | 83 --------
.../cxf/transport/sse/SseProvidersExtension.java | 54 ++++++
.../AtmosphereSseServletDestination.java | 209 ---------------------
.../main/resources/META-INF/cxf/bus-extensions.txt | 2 +-
systests/jaxrs/pom.xml | 11 +-
systests/rs-sse/rs-sse-base/pom.xml | 4 +
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 23 +++
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 18 ++
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 18 ++
.../jaxrs/sse/jetty/AbstractJettyServer.java | 2 -
.../src/test/resources/jaxrs_sse/WEB-INF/web.xml | 4 -
.../jaxrs/sse/tomcat/AbstractTomcatServer.java | 3 -
.../src/test/resources/jaxrs_sse/WEB-INF/web.xml | 4 -
.../jaxrs/sse/undertow/AbstractUndertowServer.java | 2 -
37 files changed, 355 insertions(+), 821 deletions(-)
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
index 44564aa..16eacbe 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
@@ -88,11 +88,6 @@
<artifactId>websocket-server</artifactId>
<version>${cxf.jetty9.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- </dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
index f13318d..dde5496 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
package demo.jaxrs.sse;
import org.apache.cxf.cdi.CXFCdiServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -44,7 +43,6 @@ public final class StatsServer {
// Register and map the dispatcher servlet
final CXFCdiServlet cxfServlet = new CXFCdiServlet();
final ServletHolder cxfServletHolder = new ServletHolder(cxfServlet);
- cxfServletHolder.setInitParameter(CXFCdiServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addEventListener(new Listener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
index d8301dc..5a9fe3f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
@@ -53,11 +53,6 @@
<groupId>io.undertow</groupId>
<artifactId>undertow-servlet</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- </dependency>
</dependencies>
<profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
index 77d847e..9f0c38f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -21,7 +21,6 @@ package demo.jaxrs.sse;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import io.undertow.Handlers;
import io.undertow.Undertow;
@@ -44,7 +43,6 @@ public final class StatsServer {
.setDeploymentName("sse-demo")
.addServlets(
servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
- .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
.addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
.addInitParam("jaxrs.serviceClasses", StatsRestServiceImpl.class.getName())
.setAsyncSupported(true)
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
index d77b806..fdfedfe 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
@@ -33,7 +33,6 @@
<properties>
<cxf.version>${project.version}</cxf.version>
<!-- TODO remove these local entries after making the referenced dependency managed in parent/pom.xml -->
- <cxf.atmosphere.version>2.4.3</cxf.atmosphere.version>
<cxf.jetty92.version>9.2.15.v20160210</cxf.jetty92.version>
<cxf.jetty93.version>9.3.5.v20151012</cxf.jetty93.version>
<cxf.jetty.version>${cxf.jetty93.version}</cxf.jetty.version>
@@ -93,10 +92,5 @@
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
- <!-- add atmosphere -->
- <dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
index c53b4eb..36d885f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
@@ -31,6 +31,7 @@
<!-- Application resources -->
<bean id="statsResource" class="demo.jaxrs.server.StatsRestServiceImpl" />
<bean id="jsonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider" />
+ <bean id="sseFeature" class="org.apache.cxf.jaxrs.sse.SseFeature" />
<cxf:bus>
<cxf:features>
@@ -38,14 +39,16 @@
</cxf:features>
</cxf:bus>
- <jaxrs:server id="sseSampleService" address="/" transportId="http://cxf.apache.org/transports/http/sse">
+ <jaxrs:server id="sseSampleService" address="/">
<jaxrs:serviceBeans>
<ref component-id="statsResource" />
</jaxrs:serviceBeans>
<jaxrs:providers>
- <ref component-id="corsFilter" />
<ref component-id="jsonProvider" />
</jaxrs:providers>
+ <jaxrs:features>
+ <ref component-id="sseFeature" />
+ </jaxrs:features>
</jaxrs:server>
</blueprint>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
index 3380050..c327a08 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@@ -50,7 +49,6 @@ public class StatsConfig {
.createEndpoint(new StatsApplication(), JAXRSServerFactoryBean.class);
factory.setServiceBean(statsRestService);
factory.setProvider(new JacksonJsonProvider());
- factory.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
return factory.create();
}
}
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
index 4896045..9e23afc 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
package demo.jaxrs.sse;
import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -43,7 +42,6 @@ public final class StatsServer {
// Register and map the dispatcher servlet
final ServletHolder cxfServletHolder = new ServletHolder(new CXFServlet());
- cxfServletHolder.setInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addEventListener(new ContextLoaderListener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
index d732846..c5ca9d0 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
@@ -90,11 +90,6 @@
<artifactId>tomcat-embed-websocket</artifactId>
<version>${cxf.tomcat.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- </dependency>
</dependencies>
<profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
index 6adad0b..b98af99 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -29,7 +29,6 @@ import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.webresources.DirResourceSet;
import org.apache.catalina.webresources.StandardRoot;
import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import org.springframework.web.context.ContextLoaderListener;
public final class StatsServer {
@@ -51,7 +50,6 @@ public final class StatsServer {
context.setResources(resourcesFrom(context, "target/classes"));
final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFServlet());
- cxfServlet.addInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
cxfServlet.setAsyncSupported(true);
context.addServletMapping("/rest/*", "cxfServlet");
diff --git a/osgi/karaf/features/src/main/resources/features.xml b/osgi/karaf/features/src/main/resources/features.xml
index 87fe948..973a790 100644
--- a/osgi/karaf/features/src/main/resources/features.xml
+++ b/osgi/karaf/features/src/main/resources/features.xml
@@ -405,7 +405,6 @@
<feature name="cxf-sse" version="${project.version}">
<feature version="${project.version}">cxf-http</feature>
<feature version="${project.version}">cxf-jaxrs</feature>
- <bundle dependency='true'>mvn:org.atmosphere/atmosphere-runtime/${cxf.atmosphere.version}</bundle>
<bundle start-level="40">mvn:org.apache.cxf/cxf-rt-rs-sse/${project.version}</bundle>
<capability>
cxf.http.provider;name=sse
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
index 3da5e54..eb3a8d2 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
@@ -261,7 +261,7 @@ public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
}
public synchronized boolean suspendContinuationIfNeeded() {
- if (!resumedByApplication && !cont.isPending() && !cont.isResumed()) {
+ if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) {
cont.suspend(AsyncResponse.NO_TIMEOUT);
initialSuspend = false;
return true;
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index 3da405f..58ae3af 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -58,10 +58,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- </dependency>
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${cxf.mockito.version}</version>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
index a97020f..28ac1a4 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -112,6 +112,8 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse
writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
os.write(NEW_LINE);
}
+
+ os.write(NEW_LINE);
}
@SuppressWarnings("unchecked")
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 3884254..7a96bc7 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,37 +18,67 @@
*/
package org.apache.cxf.jaxrs.sse;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
public class SseBroadcasterImpl implements SseBroadcaster {
private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
-
- private final Set<Consumer<SseEventSink>> closers =
- new CopyOnWriteArraySet<>();
-
- private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
- new CopyOnWriteArraySet<>();
+ private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+ private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void register(SseEventSink sink) {
+ assertNotClosed();
+
+ final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+ final AsyncContext ctx = sinkImpl.getAsyncContext();
+
+ ctx.addListener(new AsyncListener() {
+ @Override
+ public void onComplete(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onError(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
+
+ }
+ });
+
subscribers.add(sink);
}
@Override
public CompletionStage<?> broadcast(OutboundSseEvent event) {
+ assertNotClosed();
+
final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-
for (SseEventSink sink: subscribers) {
try {
futures.add(sink.send(event).toCompletableFuture());
@@ -62,19 +92,29 @@ public class SseBroadcasterImpl implements SseBroadcaster {
@Override
public void onClose(Consumer<SseEventSink> subscriber) {
+ assertNotClosed();
closers.add(subscriber);
}
@Override
public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+ assertNotClosed();
exceptioners.add(exceptioner);
}
@Override
public void close() {
- subscribers.forEach(subscriber -> {
- subscriber.close();
- closers.forEach(closer -> closer.accept(subscriber));
- });
+ if (closed.compareAndSet(false, true)) {
+ subscribers.forEach(subscriber -> {
+ subscriber.close();
+ closers.forEach(closer -> closer.accept(subscriber));
+ });
+ }
+ }
+
+ private void assertNotClosed() {
+ if (closed.get()) {
+ throw new IllegalStateException("The SSE broadcaster is already closed");
+ }
}
}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
similarity index 64%
rename from rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
rename to rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index 8d87a3d..67e9c54 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -16,22 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cxf.jaxrs.sse.atmosphere;
+
+
+package org.apache.cxf.jaxrs.sse;
import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
-public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+
@Override
public SseEventSink createContext(Message message) {
final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
@@ -39,23 +41,10 @@ public class SseAtmosphereEventSinkContextProvider implements ContextProvider<Ss
throw new IllegalStateException("Unable to retrieve HTTP request from the context");
}
- final AtmosphereResource resource = (AtmosphereResource)request
- .getAttribute(AtmosphereResource.class.getName());
- if (resource == null) {
- throw new IllegalStateException("AtmosphereResource is not present, "
- + "is AtmosphereServlet configured properly?");
- }
-
- final Broadcaster broadcaster = resource.getAtmosphereConfig()
- .getBroadcasterFactory()
- .lookup(resource.uuid(), true);
-
- resource.removeFromAllBroadcasters();
- resource.setBroadcaster(broadcaster);
-
final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
ServerProviderFactory.getInstance(message), message.getExchange());
- return new SseAtmosphereEventSinkImpl(writer, resource);
+ final AsyncResponse async = new AsyncResponseImpl(message);
+ return new SseEventSinkImpl(writer, async, request.getAsyncContext());
}
-}
+}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
new file mode 100644
index 0000000..f39de07
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+ private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
+ private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
+ private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
+
+ private final AsyncContext ctx;
+ private final MessageBodyWriter<OutboundSseEvent> writer;
+ private final Queue<QueuedEvent> buffer;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean dispatching = new AtomicBoolean(false);
+
+ public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+ final AsyncResponse async, final AsyncContext ctx) {
+
+ this.writer = writer;
+ this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
+ this.ctx = ctx;
+
+ if (ctx == null) {
+ throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
+ + "Is the Servlet configured properly?");
+ }
+
+ ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+ }
+
+ public AsyncContext getAsyncContext() {
+ return ctx;
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ // In case we are still dispatching, give the events the chance to be
+ // sent over to the consumers. The good example would be sent(event) call,
+ // immediately followed by the close() call.
+ if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+ LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+ }
+
+ try {
+ ctx.complete();
+ } catch (final IllegalStateException ex) {
+ LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+ }
+ }
+ }
+
+ private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+ final long parkTime = unit.toNanos(timeout) / 20;
+ int attempt = 0;
+
+ while (dispatching.get() && ++attempt < 20) {
+ LockSupport.parkNanos(parkTime);
+ }
+
+ return buffer.isEmpty();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public CompletionStage<?> send(OutboundSseEvent event) {
+ final CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (!closed.get() && writer != null) {
+ if (buffer.offer(new QueuedEvent(event, future))) {
+ if (dispatching.compareAndSet(false, true)) {
+ ctx.start(this::dequeue);
+ }
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The buffer is full (10000), unable to queue SSE event for send"));
+ }
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The sink is already closed, unable to queue SSE event for send"));
+ }
+
+ return future;
+ }
+
+ private void dequeue() {
+ try {
+ while (true) {
+ final QueuedEvent qeuedEvent = buffer.poll();
+
+ // Nothing queued, release the thread
+ if (qeuedEvent == null) {
+ break;
+ }
+
+ final OutboundSseEvent event = qeuedEvent.event;
+ final CompletableFuture<?> future = qeuedEvent.completion;
+
+ try {
+ writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+ event.getMediaType(), null, ctx.getResponse().getOutputStream());
+ ctx.getResponse().flushBuffer();
+ future.complete(null);
+ } catch (final Exception ex) {
+ future.completeExceptionally(ex);
+ }
+ }
+ } finally {
+ dispatching.set(false);
+ }
+ }
+
+ private static class QueuedEvent {
+ private final OutboundSseEvent event;
+ private final CompletableFuture<?> completion;
+
+ QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+ this.event = event;
+ this.completion = completion;
+ }
+ }
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index 36ea9ed..9ecbe27 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -28,7 +28,6 @@ import org.apache.cxf.annotations.Provider.Type;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
@Provider(value = Type.Feature, scope = Scope.Server)
public class SseFeature extends AbstractFeature {
@@ -36,8 +35,8 @@ public class SseFeature extends AbstractFeature {
public void initialize(Server server, Bus bus) {
final List<Object> providers = new ArrayList<>();
- providers.add(new SseAtmosphereEventSinkContextProvider());
providers.add(new SseContextProvider());
+ providers.add(new SseEventSinkContextProvider());
((ServerProviderFactory) server.getEndpoint().get(
ServerProviderFactory.class.getName())).setUserProviders(providers);
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
deleted file mode 100644
index af984e7..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-
-public class SseAtmosphereEventSinkImpl implements SseEventSink {
- private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
-
- private final AtmosphereResource resource;
- private final MessageBodyWriter<OutboundSseEvent> writer;
- private final boolean usingStream;
-
- private volatile boolean closed;
-
- public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
- final AtmosphereResource resource) {
- this.writer = writer;
- this.resource = resource;
- this.usingStream = (Boolean)resource.getRequest().getAttribute(PROPERTY_USE_STREAM);
-
- if (!resource.isSuspended()) {
- LOG.fine("Atmosphere resource is not suspended, suspending");
- resource.suspend();
- }
- }
-
- @Override
- public void close() {
- if (!closed) {
- closed = true;
-
- LOG.fine("Closing Atmosphere SSE event output");
- if (resource.isSuspended()) {
- LOG.fine("Atmosphere resource is suspended, resuming");
- resource.resume();
- }
-
- final Broadcaster broadcaster = resource.getBroadcaster();
- resource.removeFromAllBroadcasters();
-
- try {
- final AtmosphereResponse response = resource.getResponse();
-
- try {
- // The resource's request property is null here, we have to
- // rely on initial request to understand what to close, response
- // or stream.
- if (usingStream) {
- response.getOutputStream().close();
- } else {
- response.getWriter().close();
- }
- } catch (final IOException ex) {
- LOG.warning("Failed to flush AtmosphereResponse buffer: "
- + ex.getMessage());
- }
- } finally {
- try {
- resource.close();
- } catch (IOException ex) {
- // ignore
- }
- broadcaster.destroy();
- LOG.fine("Atmosphere SSE event output is closed");
- }
- }
- }
-
- @Override
- public CompletionStage<?> send(OutboundSseEvent event) {
- final CompletableFuture<?> future = new CompletableFuture<>();
-
- if (!closed && writer != null) {
- try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
- writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-
- // Atmosphere broadcasts asynchronously which is acceptable in most cases.
- // Unfortunately, calling close() may lead to response stream being closed
- // while there are still some SSE delivery scheduled.
- return CompletableFuture.completedFuture(
- resource
- .getBroadcaster()
- .broadcast(os.toString(StandardCharsets.UTF_8.name()))
- .get(1, TimeUnit.SECONDS)
- );
- } catch (final IOException ex) {
- LOG.warning("While writing the SSE event, an exception was raised: " + ex);
- future.completeExceptionally(ex);
- } catch (final ExecutionException | InterruptedException ex) {
- LOG.warning("SSE Atmosphere response was not delivered");
- future.completeExceptionally(ex);
- } catch (final TimeoutException ex) {
- LOG.warning("SSE Atmosphere response was not delivered within default timeout");
- future.completeExceptionally(ex);
- }
- } else {
- future.complete(null);
- }
-
- return future;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
deleted file mode 100644
index ab8ef5c..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.Action;
-import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
-import org.atmosphere.cpr.AsyncIOWriter;
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.interceptor.AllowInterceptor;
-import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
-import org.atmosphere.util.Utils;
-
-import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
-import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
-
-/**
- * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
- * implementation does two things which do not fit well into SSE support:
- * - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
- * - wraps the whatever object is being written to SSE payload (overridden using
- * the complete SSE protocol)
- */
-public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
- private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class);
-
- private static final byte[] PADDING;
- private static final String PADDING_TEXT;
- private static final byte[] END = "\r\n\r\n".getBytes();
-
- static {
- StringBuffer whitespace = new StringBuffer();
- for (int i = 0; i < 2000; i++) {
- whitespace.append(" ");
- }
- whitespace.append("\n");
- PADDING_TEXT = whitespace.toString();
- PADDING = PADDING_TEXT.getBytes();
- }
-
- private boolean writePadding(AtmosphereResponse response) {
- if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
- return false;
- }
-
- response.setContentType(SERVER_SENT_EVENTS);
- response.setCharacterEncoding("utf-8");
- boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
- if (isUsingStream) {
- try {
- OutputStream stream = response.getResponse().getOutputStream();
- try {
- stream.write(PADDING);
- stream.flush();
- } catch (IOException ex) {
- LOG.log(Level.WARNING, "SSE may not work", ex);
- }
- } catch (IOException e) {
- LOG.log(Level.FINEST, "", e);
- }
- } else {
- try {
- PrintWriter w = response.getResponse().getWriter();
- w.println(PADDING_TEXT);
- w.flush();
- } catch (IOException e) {
- LOG.log(Level.FINEST, "", e);
- }
- }
- response.resource().getRequest().setAttribute("paddingWritten", "true");
- return true;
- }
-
- @Override
- public Action inspect(final AtmosphereResource r) {
- if (Utils.webSocketMessage(r)) {
- return Action.CONTINUE;
- }
-
- final AtmosphereRequest request = r.getRequest();
- final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
-
- if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
- final AtmosphereResponse response = r.getResponse();
- if (response.getAsyncIOWriter() == null) {
- response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
- }
-
- r.addEventListener(new P(response));
-
- AsyncIOWriter writer = response.getAsyncIOWriter();
- if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
- AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
- private boolean padding() {
- if (!r.isSuspended()) {
- return writePadding(response);
- }
- return false;
- }
-
- @Override
- public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
- padding();
- }
-
- @Override
- public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
- // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
- // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
- if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
- || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
- response.write(END, true);
- }
-
- /**
- * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
- * resume after every message.
- */
- String ua = r.getRequest().getHeader("User-Agent");
- if (ua != null && ua.contains("MSIE")) {
- try {
- response.flushBuffer();
- } catch (IOException e) {
- LOG.log(Level.FINEST, "", e);
- }
- r.resume();
- }
- }
- });
- } else {
- LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s",
- getClass().getName(), AtmosphereInterceptorWriter.class.getName()));
- }
- }
-
- return Action.CONTINUE;
- }
-
- private final class P extends OnPreSuspend implements AllowInterceptor {
-
- private final AtmosphereResponse response;
-
- private P(AtmosphereResponse response) {
- this.response = response;
- }
-
- @Override
- public void onPreSuspend(AtmosphereResourceEvent event) {
- writePadding(response);
- }
- }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
deleted file mode 100644
index cfafe87..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereResponse;
-
-public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
- @Override
- public void close(AtmosphereResponse response) throws IOException {
- // Do not close the response, keep output stream open
- }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index b0123d0..c6267d4 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -18,24 +18,15 @@
*/
package org.apache.cxf.jaxrs.sse.ext;
-import org.apache.cxf.Bus;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
import org.apache.cxf.jaxrs.sse.SseContextProvider;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
-import org.apache.cxf.transport.AbstractTransportFactory;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
@Override
public void customize(final JAXRSServerFactoryBean bean) {
- bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
bean.setProvider(new SseContextProvider());
- bean.setProvider(new SseAtmosphereEventSinkContextProvider());
-
- final Bus bus = bean.getBus();
- if (bus != null) {
- bus.setProperty(AbstractTransportFactory.PREFERRED_TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
- }
+ bean.setProvider(new SseEventSinkContextProvider());
}
}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
deleted file mode 100644
index b1ca29d..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-import org.apache.cxf.transport.http.HttpDestinationFactory;
-import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
-
-public class SseDestinationFactory implements HttpDestinationFactory {
- private static final Logger LOG = LogUtils.getL7dLogger(SseDestinationFactory.class);
-
- @Override
- public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus,
- DestinationRegistry registry) throws IOException {
- return new AtmosphereSseServletDestination(bus, getDestinationRegistry(bus), endpointInfo,
- endpointInfo.getAddress());
- }
-
- /**
- * The destination factory should be taken from HTTP transport as a workaround to
- * register the destinations properly in the OSGi container.
- */
- private static DestinationRegistry getDestinationRegistry(Bus bus) {
- DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
-
- try {
- DestinationFactory df = dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
- if (df instanceof HTTPTransportFactory) {
- HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
- return transportFactory.getRegistry();
- }
- } catch (final Exception ex) {
- LOG.warning("Unable to deduct the destination registry from HTTP transport: " + ex.getMessage());
- }
-
- return null;
- }
-
-
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
deleted file mode 100644
index a1f0d68..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-
-@NoJSR250Annotations
-public class SseHttpTransportFactory extends HTTPTransportFactory
- implements ConduitInitiator, DestinationFactory {
-
- public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
- public static final List<String> DEFAULT_NAMESPACES = Collections.unmodifiableList(Arrays.asList(
- TRANSPORT_ID,
- "http://cxf.apache.org/transports/http/sse/configuration"
- ));
-
- private final SseDestinationFactory factory = new SseDestinationFactory();
-
- public SseHttpTransportFactory() {
- this(null);
- }
-
- public SseHttpTransportFactory(DestinationRegistry registry) {
- super(DEFAULT_NAMESPACES, registry);
- }
-
- @Override
- public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
- if (endpointInfo == null) {
- throw new IllegalArgumentException("EndpointInfo cannot be null");
- }
-
- // In order to register the destination in the OSGi container, we have to
- // include it into 2 registries basically: for HTTP transport and the current
- // one. The workaround is borrow from org.apache.cxf.transport.websocket.WebSocketTransportFactory,
- // it seems like no better option exists at the moment.
- synchronized (registry) {
- AbstractHTTPDestination d = registry.getDestinationForPath(endpointInfo.getAddress());
-
- if (d == null) {
- d = factory.createDestination(endpointInfo, bus, registry);
- if (d == null) {
- throw new IOException("No destination available. The CXF SSE transport needs Atmosphere"
- + " dependencies to be available");
- }
- registry.addDestination(d);
- configure(bus, d);
- d.finalizeConfig();
- }
-
- return d;
- }
- }
-}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
new file mode 100644
index 0000000..0277611
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.sse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusCreationListener;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+
+public class SseProvidersExtension implements BusCreationListener {
+
+ private static final String BUS_PROVIDERS = "org.apache.cxf.jaxrs.bus.providers";
+
+ @Override
+ public void busCreated(Bus bus) {
+ Object providers = bus.getProperty(BUS_PROVIDERS);
+
+ final List<?> sseProviders =
+ Arrays.asList(
+ new SseContextProvider(),
+ new SseEventSinkContextProvider()
+ );
+
+ if (providers instanceof List) {
+ final List<?> existing = new ArrayList<>((List<?>)providers);
+ existing.addAll(CastUtils.cast(sseProviders));
+ bus.setProperty(BUS_PROVIDERS, existing);
+ } else {
+ bus.setProperty(BUS_PROVIDERS, sseProviders);
+ }
+ }
+
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
deleted file mode 100644
index add8afa..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.jaxrs.sse.SseFeature;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.Headers;
-import org.apache.cxf.transport.servlet.ServletDestination;
-import org.atmosphere.cache.UUIDBroadcasterCache;
-import org.atmosphere.cpr.ApplicationConfig;
-import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponseImpl;
-import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-
-public class AtmosphereSseServletDestination extends ServletDestination {
- private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
-
- private AtmosphereFramework framework;
-
- public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry,
- EndpointInfo ei, String path) throws IOException {
- super(bus, registry, ei, path);
-
- framework = create();
-
- bus.getFeatures().add(new SseFeature());
- }
-
- private AtmosphereFramework create() {
- final AtmosphereFramework instance = new AtmosphereFramework(true, false);
-
- instance.interceptor(new SseAtmosphereInterceptor());
- instance.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
- instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
- instance.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
- instance.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
- // Atmosphere does not limit amount of threads and application can crash in no time
- // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance
- instance.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "20");
- instance.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "20");
- // workaround for atmosphere's jsr356 initialization requiring servletConfig
- instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true");
-
- instance.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName());
- instance.addAtmosphereHandler("/", new DestinationHandler());
-
- return instance;
- }
-
- @Override
- public void onServletConfigAvailable(ServletConfig config) throws ServletException {
- // Very likely there is JSR-356 implementation available, let us reconfigure the Atmosphere framework
- // to use it since ServletConfig instance is already available.
- final Object container = config.getServletContext()
- .getAttribute("javax.websocket.server.ServerContainer");
-
- if (container != null) {
- if (framework.initialized()) {
- framework.destroy();
- }
-
- framework = create();
- framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "false");
- framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "false");
-
- framework.init(config);
- }
- }
-
- @Override
- public void finalizeConfig() {
- if (framework.initialized()) {
- return;
- }
-
- final ServletContext ctx = bus.getExtension(ServletContext.class);
- if (ctx != null) {
- try {
- framework.init(new ServletConfig() {
- @Override
- public String getServletName() {
- return null;
- }
- @Override
- public ServletContext getServletContext() {
- return ctx;
- }
- @Override
- public String getInitParameter(String name) {
- return null;
- }
-
- @Override
- public Enumeration<String> getInitParameterNames() {
- return null;
- }
- });
- } catch (ServletException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- } else {
- framework.init();
- }
- }
-
- @Override
- public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
- HttpServletResponse resp) throws IOException {
- try {
- framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
- } catch (ServletException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void shutdown() {
- try {
- framework.destroy();
- } catch (Exception ex) {
- LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
- } finally {
- super.shutdown();
- }
- }
-
- private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
- @Override
- public void onRequest(final AtmosphereResource resource) throws IOException {
- LOG.fine("onRequest");
- try {
- AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
- resource.getRequest(), resource.getResponse());
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to invoke service", e);
- }
- }
- }
-
- @Override
- protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
- adjustContentLength(outMessage);
- return super.flushHeaders(outMessage, getStream);
- }
-
- @Override
- protected OutputStream flushHeaders(Message outMessage) throws IOException {
- adjustContentLength(outMessage);
- return super.flushHeaders(outMessage);
- }
-
- /**
- * It has been noticed that Jetty checks the "Content-Length" header and completes the
- * response if its value is 0 (or matches the number of bytes written). However, in case
- * of SSE the content length is unknown so we are setting it to -1 before flushing the
- * response. Otherwise, only the first event is going to be sent and response is going to
- * be closed.
- */
- private void adjustContentLength(Message outMessage) {
- final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
-
- if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
- final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
- headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
- }
- }
-}
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
index 643b51c..97a6b9f 100644
--- a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -1 +1 @@
-org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file
+org.apache.cxf.transport.sse.SseProvidersExtension::true
\ No newline at end of file
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index ee2fc13..73e3d0b 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -537,17 +537,14 @@
<scope>test</scope>
</dependency>
<dependency>
- <!-- activate atmosphere. Its use can be disabled by setting
- org.apache.cxf.transport.websocket.atmosphere.disabled to "true" -->
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-runtime</artifactId>
- <version>${cxf.atmosphere.version}</version>
- </dependency>
- <dependency>
<groupId>org.qi4j.library</groupId>
<artifactId>org.qi4j.library.circuitbreaker</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/systests/rs-sse/rs-sse-base/pom.xml b/systests/rs-sse/rs-sse-base/pom.xml
index 04584c2..260f609 100644
--- a/systests/rs-sse/rs-sse-base/pom.xml
+++ b/systests/rs-sse/rs-sse-base/pom.xml
@@ -49,5 +49,9 @@
<artifactId>cxf-testutils</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 5c8bf99..1e32b4b 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -177,6 +177,29 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
r.close();
}
+ @Test
+ public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/nodelay/sse/0");
+ final Collection<Book> books = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collect(books), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ awaitEvents(5000, books, 5);
+ }
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertThat(books,
+ hasItems(
+ new Book("New Book #1", 1),
+ new Book("New Book #2", 2),
+ new Book("New Book #3", 3),
+ new Book("New Book #4", 4),
+ new Book("New Book #5", 5)
+ )
+ );
+ }
+
private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index ec6b1c0..ad2387f 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -20,6 +20,7 @@ package org.apache.cxf.systest.jaxrs.sse;
import java.util.Arrays;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -92,6 +93,23 @@ public class BookStore {
}
}.start();
}
+
+ @GET
+ @Path("nodelay/sse/{id}")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createStatsEvent(builder.name("book"), 1));
+ sink.send(createStatsEvent(builder.name("book"), 2));
+ sink.send(createStatsEvent(builder.name("book"), 3));
+ sink.send(createStatsEvent(builder.name("book"), 4));
+ sink.send(createStatsEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
@GET
@Path("nodata")
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index dc76876..f78c5ce 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -20,6 +20,7 @@ package org.apache.cxf.systest.jaxrs.sse;
import java.util.Arrays;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -93,6 +94,23 @@ public class BookStore2 {
}
@GET
+ @Path("nodelay/sse/{id}")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createStatsEvent(builder.name("book"), 1));
+ sink.send(createStatsEvent(builder.name("book"), 2));
+ sink.send(createStatsEvent(builder.name("book"), 3));
+ sink.send(createStatsEvent(builder.name("book"), 4));
+ sink.send(createStatsEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
+
+ @GET
@Path("broadcast/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void broadcast(@Context SseEventSink sink) {
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
index a47c0c3..89deec5 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
+++ b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -57,7 +56,6 @@ public abstract class AbstractJettyServer extends AbstractBusTestServerBase {
if (resourcePath == null) {
// Register and map the dispatcher servlet
final ServletHolder holder = new ServletHolder(new CXFNonSpringJaxrsServlet());
- holder.setInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
holder.setInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
holder.setInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
final ServletContextHandler context = new ServletContextHandler();
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb..8133ebf 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -8,10 +8,6 @@
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
<init-param>
- <param-name>transportId</param-name>
- <param-value>http://cxf.apache.org/transports/http/sse</param-value>
- </init-param>
- <init-param>
<param-name>javax.ws.rs.Application</param-name>
<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
</init-param>
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
index ded2a64..799a134 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
@@ -30,7 +30,6 @@ import org.apache.catalina.startup.Tomcat;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
@@ -60,8 +59,6 @@ public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
if (resourcePath == null) {
final Context context = server.addContext("/", base.getAbsolutePath());
final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFNonSpringJaxrsServlet());
- cxfServlet.addInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID,
- SseHttpTransportFactory.TRANSPORT_ID);
cxfServlet.addInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
cxfServlet.addInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
cxfServlet.setAsyncSupported(true);
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb..8133ebf 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -8,10 +8,6 @@
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
<init-param>
- <param-name>transportId</param-name>
- <param-value>http://cxf.apache.org/transports/http/sse</param-value>
- </init-param>
- <init-param>
<param-name>javax.ws.rs.Application</param-name>
<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
</init-param>
diff --git a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
index c6f0a8d..c24dfef 100644
--- a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
+++ b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
import io.undertow.Handlers;
import io.undertow.Undertow;
@@ -54,7 +53,6 @@ public abstract class AbstractUndertowServer extends AbstractBusTestServerBase {
.setDeploymentName("sse-test")
.addServlets(
servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
- .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
.addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
.addInitParam("jaxrs.serviceClasses", BookStore.class.getName())
.setAsyncSupported(true)
--
To stop receiving notification emails like this one, please contact
reta@apache.org.