You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/05/19 16:39:02 UTC

[cxf] branch master updated: Reimplementing SSE using AsyncContext (Servlet 3.0+) (#415)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa7a09  Reimplementing SSE using AsyncContext (Servlet 3.0+) (#415)
7aa7a09 is described below

commit 7aa7a09188ae74aee5075d8723c8ff5720bc45a0
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat May 19 12:38:54 2018 -0400

    Reimplementing SSE using AsyncContext (Servlet 3.0+) (#415)
    
    * SSE implementation using AsyncContext instead of atmosphere
    
    * SSE implementation using AsyncContext: refactorings
---
 .../main/release/samples/jax_rs/sse_cdi/pom.xml    |   5 -
 .../src/main/java/demo/jaxrs/sse/StatsServer.java  |   2 -
 .../main/release/samples/jax_rs/sse_client/pom.xml |   5 -
 .../src/main/java/demo/jaxrs/sse/StatsServer.java  |   2 -
 .../main/release/samples/jax_rs/sse_osgi/pom.xml   |   6 -
 .../main/resources/OSGI-INF/blueprint/context.xml  |   7 +-
 .../src/main/java/demo/jaxrs/sse/StatsConfig.java  |   2 -
 .../src/main/java/demo/jaxrs/sse/StatsServer.java  |   2 -
 .../main/release/samples/jax_rs/sse_tomcat/pom.xml |   5 -
 .../src/main/java/demo/jaxrs/sse/StatsServer.java  |   2 -
 .../karaf/features/src/main/resources/features.xml |   1 -
 .../apache/cxf/jaxrs/impl/AsyncResponseImpl.java   |   2 +-
 rt/rs/sse/pom.xml                                  |   4 -
 .../cxf/jaxrs/sse/OutboundSseEventBodyWriter.java  |   2 +
 .../apache/cxf/jaxrs/sse/SseBroadcasterImpl.java   |  62 ++++--
 ...vider.java => SseEventSinkContextProvider.java} |  31 +--
 .../org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java | 161 ++++++++++++++++
 .../java/org/apache/cxf/jaxrs/sse/SseFeature.java  |   3 +-
 .../sse/atmosphere/SseAtmosphereEventSinkImpl.java | 144 --------------
 .../sse/atmosphere/SseAtmosphereInterceptor.java   | 181 ------------------
 .../atmosphere/SseAtmosphereInterceptorWriter.java |  31 ---
 .../ext/SseTransportCustomizationExtension.java    |  13 +-
 .../cxf/transport/sse/SseDestinationFactory.java   |  66 -------
 .../cxf/transport/sse/SseHttpTransportFactory.java |  83 --------
 .../cxf/transport/sse/SseProvidersExtension.java   |  54 ++++++
 .../AtmosphereSseServletDestination.java           | 209 ---------------------
 .../main/resources/META-INF/cxf/bus-extensions.txt |   2 +-
 systests/jaxrs/pom.xml                             |  11 +-
 systests/rs-sse/rs-sse-base/pom.xml                |   4 +
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java     |  23 +++
 .../apache/cxf/systest/jaxrs/sse/BookStore.java    |  18 ++
 .../apache/cxf/systest/jaxrs/sse/BookStore2.java   |  18 ++
 .../jaxrs/sse/jetty/AbstractJettyServer.java       |   2 -
 .../src/test/resources/jaxrs_sse/WEB-INF/web.xml   |   4 -
 .../jaxrs/sse/tomcat/AbstractTomcatServer.java     |   3 -
 .../src/test/resources/jaxrs_sse/WEB-INF/web.xml   |   4 -
 .../jaxrs/sse/undertow/AbstractUndertowServer.java |   2 -
 37 files changed, 355 insertions(+), 821 deletions(-)

diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
index 44564aa..16eacbe 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
@@ -88,11 +88,6 @@
             <artifactId>websocket-server</artifactId>
             <version>${cxf.jetty9.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
         
         <dependency>
             <groupId>io.reactivex.rxjava2</groupId>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
index f13318d..dde5496 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
 package demo.jaxrs.sse;
 
 import org.apache.cxf.cdi.CXFCdiServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -44,7 +43,6 @@ public final class StatsServer {
          // Register and map the dispatcher servlet
         final CXFCdiServlet cxfServlet = new CXFCdiServlet();
         final ServletHolder cxfServletHolder = new ServletHolder(cxfServlet);
-        cxfServletHolder.setInitParameter(CXFCdiServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         final ServletContextHandler context = new ServletContextHandler();
         context.setContextPath("/");
         context.addEventListener(new Listener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
index d8301dc..5a9fe3f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
@@ -53,11 +53,6 @@
             <groupId>io.undertow</groupId>
             <artifactId>undertow-servlet</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
     
     <profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
index 77d847e..9f0c38f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -21,7 +21,6 @@ package demo.jaxrs.sse;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 import io.undertow.Handlers;
 import io.undertow.Undertow;
@@ -44,7 +43,6 @@ public final class StatsServer {
             .setDeploymentName("sse-demo")
             .addServlets(
                 servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
-                    .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
                     .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
                     .addInitParam("jaxrs.serviceClasses", StatsRestServiceImpl.class.getName())
                     .setAsyncSupported(true)
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
index d77b806..fdfedfe 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
@@ -33,7 +33,6 @@
     <properties>
         <cxf.version>${project.version}</cxf.version>
         <!-- TODO remove these local entries after making the referenced dependency managed in parent/pom.xml -->
-        <cxf.atmosphere.version>2.4.3</cxf.atmosphere.version>
         <cxf.jetty92.version>9.2.15.v20160210</cxf.jetty92.version>
         <cxf.jetty93.version>9.3.5.v20151012</cxf.jetty93.version>
         <cxf.jetty.version>${cxf.jetty93.version}</cxf.jetty.version>
@@ -93,10 +92,5 @@
             <groupId>com.fasterxml.jackson.jaxrs</groupId>
             <artifactId>jackson-jaxrs-json-provider</artifactId>
         </dependency>
-        <!-- add atmosphere -->
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
 </project>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
index c53b4eb..36d885f 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
@@ -31,6 +31,7 @@
     <!-- Application resources -->
     <bean id="statsResource" class="demo.jaxrs.server.StatsRestServiceImpl" />
     <bean id="jsonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider" />
+    <bean id="sseFeature" class="org.apache.cxf.jaxrs.sse.SseFeature" />
  
     <cxf:bus>
         <cxf:features>
@@ -38,14 +39,16 @@
         </cxf:features>
     </cxf:bus>
 
-    <jaxrs:server id="sseSampleService" address="/" transportId="http://cxf.apache.org/transports/http/sse">
+    <jaxrs:server id="sseSampleService" address="/">
         <jaxrs:serviceBeans>
             <ref component-id="statsResource" />
         </jaxrs:serviceBeans>
         <jaxrs:providers>
-            <ref component-id="corsFilter" />
             <ref component-id="jsonProvider" />
         </jaxrs:providers>
+        <jaxrs:features>
+            <ref component-id="sseFeature" />
+        </jaxrs:features>
     </jaxrs:server>
 
 </blueprint>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
index 3380050..c327a08 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.cxf.bus.spring.SpringBus;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
@@ -50,7 +49,6 @@ public class StatsConfig {
             .createEndpoint(new StatsApplication(), JAXRSServerFactoryBean.class);
         factory.setServiceBean(statsRestService);
         factory.setProvider(new JacksonJsonProvider());
-        factory.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
         return factory.create();
     }
 }
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
index 4896045..9e23afc 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
 package demo.jaxrs.sse;
 
 import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -43,7 +42,6 @@ public final class StatsServer {
 
          // Register and map the dispatcher servlet
         final ServletHolder cxfServletHolder = new ServletHolder(new CXFServlet());
-        cxfServletHolder.setInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         final ServletContextHandler context = new ServletContextHandler();
         context.setContextPath("/");
         context.addEventListener(new ContextLoaderListener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
index d732846..c5ca9d0 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
@@ -90,11 +90,6 @@
             <artifactId>tomcat-embed-websocket</artifactId>
             <version>${cxf.tomcat.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
     
     <profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
index 6adad0b..b98af99 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -29,7 +29,6 @@ import org.apache.catalina.startup.Tomcat;
 import org.apache.catalina.webresources.DirResourceSet;
 import org.apache.catalina.webresources.StandardRoot;
 import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.springframework.web.context.ContextLoaderListener;
 
 public final class StatsServer {
@@ -51,7 +50,6 @@ public final class StatsServer {
         context.setResources(resourcesFrom(context, "target/classes"));
 
         final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFServlet());
-        cxfServlet.addInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         cxfServlet.setAsyncSupported(true);
         context.addServletMapping("/rest/*", "cxfServlet");
 
diff --git a/osgi/karaf/features/src/main/resources/features.xml b/osgi/karaf/features/src/main/resources/features.xml
index 87fe948..973a790 100644
--- a/osgi/karaf/features/src/main/resources/features.xml
+++ b/osgi/karaf/features/src/main/resources/features.xml
@@ -405,7 +405,6 @@
     <feature name="cxf-sse" version="${project.version}">
         <feature version="${project.version}">cxf-http</feature>
         <feature version="${project.version}">cxf-jaxrs</feature>
-        <bundle dependency='true'>mvn:org.atmosphere/atmosphere-runtime/${cxf.atmosphere.version}</bundle>
         <bundle start-level="40">mvn:org.apache.cxf/cxf-rt-rs-sse/${project.version}</bundle>
         <capability>
             cxf.http.provider;name=sse
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
index 3da5e54..eb3a8d2 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
@@ -261,7 +261,7 @@ public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
     }
 
     public synchronized boolean suspendContinuationIfNeeded() {
-        if (!resumedByApplication && !cont.isPending() && !cont.isResumed()) {
+        if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) {
             cont.suspend(AsyncResponse.NO_TIMEOUT);
             initialSuspend = false;
             return true;
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index 3da405f..58ae3af 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -58,10 +58,6 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <version>${cxf.mockito.version}</version>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
index a97020f..28ac1a4 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -112,6 +112,8 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse
             writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
             os.write(NEW_LINE);
         }
+
+        os.write(NEW_LINE);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 3884254..7a96bc7 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,37 +18,67 @@
  */
 package org.apache.cxf.jaxrs.sse;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
 import javax.ws.rs.sse.SseEventSink;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
     private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
-
-    private final Set<Consumer<SseEventSink>> closers =
-            new CopyOnWriteArraySet<>();
-
-    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
-            new CopyOnWriteArraySet<>();
+    private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     @Override
     public void register(SseEventSink sink) {
+        assertNotClosed();
+
+        final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+        final AsyncContext ctx = sinkImpl.getAsyncContext();
+
+        ctx.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
+
+            }
+        });
+
         subscribers.add(sink);
     }
 
     @Override
     public CompletionStage<?> broadcast(OutboundSseEvent event) {
+        assertNotClosed();
+
         final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-        
         for (SseEventSink sink: subscribers) {
             try {
                 futures.add(sink.send(event).toCompletableFuture());
@@ -62,19 +92,29 @@ public class SseBroadcasterImpl implements SseBroadcaster {
 
     @Override
     public void onClose(Consumer<SseEventSink> subscriber) {
+        assertNotClosed();
         closers.add(subscriber);
     }
 
     @Override
     public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+        assertNotClosed();
         exceptioners.add(exceptioner);
     }
 
     @Override
     public void close() {
-        subscribers.forEach(subscriber -> {
-            subscriber.close();
-            closers.forEach(closer -> closer.accept(subscriber));
-        });
+        if (closed.compareAndSet(false, true)) {
+            subscribers.forEach(subscriber -> {
+                subscriber.close();
+                closers.forEach(closer -> closer.accept(subscriber));
+            });
+        }
+    }
+
+    private void assertNotClosed() {
+        if (closed.get()) {
+            throw new IllegalStateException("The SSE broadcaster is already closed");
+        }
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
similarity index 64%
rename from rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
rename to rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index 8d87a3d..67e9c54 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -16,22 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.sse.atmosphere;
+
+
+package org.apache.cxf.jaxrs.sse;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
 
 import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
 
-public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+
     @Override
     public SseEventSink createContext(Message message) {
         final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
@@ -39,23 +41,10 @@ public class SseAtmosphereEventSinkContextProvider implements ContextProvider<Ss
             throw new IllegalStateException("Unable to retrieve HTTP request from the context");
         }
 
-        final AtmosphereResource resource = (AtmosphereResource)request
-            .getAttribute(AtmosphereResource.class.getName());
-        if (resource == null) {
-            throw new IllegalStateException("AtmosphereResource is not present, "
-                    + "is AtmosphereServlet configured properly?");
-        }
-
-        final Broadcaster broadcaster = resource.getAtmosphereConfig()
-            .getBroadcasterFactory()
-            .lookup(resource.uuid(), true);
-
-        resource.removeFromAllBroadcasters();
-        resource.setBroadcaster(broadcaster);
-
         final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
             ServerProviderFactory.getInstance(message), message.getExchange());
 
-        return new SseAtmosphereEventSinkImpl(writer, resource);
+        final AsyncResponse async = new AsyncResponseImpl(message);
+        return new SseEventSinkImpl(writer, async, request.getAsyncContext());
     }
-}
+}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
new file mode 100644
index 0000000..f39de07
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+    private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
+    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
+    private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
+
+    private final AsyncContext ctx;
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    private final Queue<QueuedEvent> buffer;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean dispatching = new AtomicBoolean(false);
+
+    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
+            final AsyncResponse async, final AsyncContext ctx) {
+        
+        this.writer = writer;
+        this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
+        this.ctx = ctx;
+
+        if (ctx == null) {
+            throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
+                + "Is the Servlet configured properly?");
+        }
+
+        ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+    }
+
+    public AsyncContext getAsyncContext() {
+        return ctx;
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            // In case we are still dispatching, give the events the chance to be
+            // sent over to the consumers. The good example would be sent(event) call,
+            // immediately followed by the close() call.
+            if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+                LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+            }
+            
+            try {
+                ctx.complete();
+            } catch (final IllegalStateException ex) {
+                LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+            }
+        }
+    }
+
+    private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+        final long parkTime = unit.toNanos(timeout) / 20;
+        int attempt = 0;
+        
+        while (dispatching.get() && ++attempt < 20) {
+            LockSupport.parkNanos(parkTime);
+        }
+        
+        return buffer.isEmpty();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    @Override
+    public CompletionStage<?> send(OutboundSseEvent event) {
+        final CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (!closed.get() && writer != null) {
+            if (buffer.offer(new QueuedEvent(event, future))) {
+                if (dispatching.compareAndSet(false, true)) {
+                    ctx.start(this::dequeue);
+                }
+            } else {
+                future.completeExceptionally(new IllegalStateException(
+                    "The buffer is full (10000), unable to queue SSE event for send"));
+            }
+        } else {
+            future.completeExceptionally(new IllegalStateException(
+                "The sink is already closed, unable to queue SSE event for send"));
+        }
+
+        return future;
+    }
+
+    private void dequeue() {
+        try {
+            while (true) {
+                final QueuedEvent qeuedEvent = buffer.poll();
+                
+                // Nothing queued, release the thread
+                if (qeuedEvent == null) {
+                    break;
+                }
+                
+                final OutboundSseEvent event = qeuedEvent.event;
+                final CompletableFuture<?> future = qeuedEvent.completion;
+    
+                try {
+                    writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+                        event.getMediaType(), null, ctx.getResponse().getOutputStream());
+                    ctx.getResponse().flushBuffer();
+                    future.complete(null);
+                } catch (final Exception ex) {
+                    future.completeExceptionally(ex);
+                }
+            }
+        } finally {
+            dispatching.set(false);
+        }
+    }
+
+    private static class QueuedEvent {
+        private final OutboundSseEvent event;
+        private final CompletableFuture<?> completion;
+
+        QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+            this.event = event;
+            this.completion = completion;
+        }
+    }
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index 36ea9ed..9ecbe27 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -28,7 +28,6 @@ import org.apache.cxf.annotations.Provider.Type;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
 
 @Provider(value = Type.Feature, scope = Scope.Server)
 public class SseFeature extends AbstractFeature {
@@ -36,8 +35,8 @@ public class SseFeature extends AbstractFeature {
     public void initialize(Server server, Bus bus) {
         final List<Object> providers = new ArrayList<>();
 
-        providers.add(new SseAtmosphereEventSinkContextProvider());
         providers.add(new SseContextProvider());
+        providers.add(new SseEventSinkContextProvider());
 
         ((ServerProviderFactory) server.getEndpoint().get(
             ServerProviderFactory.class.getName())).setUserProviders(providers);
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
deleted file mode 100644
index af984e7..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-
-public class SseAtmosphereEventSinkImpl implements SseEventSink {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
-
-    private final AtmosphereResource resource;
-    private final MessageBodyWriter<OutboundSseEvent> writer;
-    private final boolean usingStream;
-    
-    private volatile boolean closed;
-
-    public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
-            final AtmosphereResource resource) {
-        this.writer = writer;
-        this.resource = resource;
-        this.usingStream = (Boolean)resource.getRequest().getAttribute(PROPERTY_USE_STREAM);
-
-        if (!resource.isSuspended()) {
-            LOG.fine("Atmosphere resource is not suspended, suspending");
-            resource.suspend();
-        }
-    }
-
-    @Override
-    public void close() {
-        if (!closed) {
-            closed = true;
-
-            LOG.fine("Closing Atmosphere SSE event output");
-            if (resource.isSuspended()) {
-                LOG.fine("Atmosphere resource is suspended, resuming");
-                resource.resume();
-            }
-
-            final Broadcaster broadcaster = resource.getBroadcaster();
-            resource.removeFromAllBroadcasters();
-
-            try {
-                final AtmosphereResponse response = resource.getResponse();
-                
-                try {
-                    // The resource's request property is null here, we have to
-                    // rely on initial request to understand what to close, response
-                    // or stream.
-                    if (usingStream) {
-                        response.getOutputStream().close();
-                    } else {
-                        response.getWriter().close();
-                    }                    
-                } catch (final IOException ex) {
-                    LOG.warning("Failed to flush AtmosphereResponse buffer: "
-                        + ex.getMessage());
-                }
-            } finally {
-                try {
-                    resource.close();
-                } catch (IOException ex) {
-                    // ignore
-                }
-                broadcaster.destroy();
-                LOG.fine("Atmosphere SSE event output is closed");
-            }
-        }
-    }
-
-    @Override
-    public CompletionStage<?> send(OutboundSseEvent event) {
-        final CompletableFuture<?> future = new CompletableFuture<>();
-        
-        if (!closed && writer != null) {
-            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-
-                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
-                // Unfortunately, calling close() may lead to response stream being closed
-                // while there are still some SSE delivery scheduled.
-                return CompletableFuture.completedFuture(
-                    resource
-                        .getBroadcaster()
-                        .broadcast(os.toString(StandardCharsets.UTF_8.name()))
-                        .get(1, TimeUnit.SECONDS)
-                    );
-            } catch (final IOException ex) {
-                LOG.warning("While writing the SSE event, an exception was raised: " + ex);
-                future.completeExceptionally(ex);
-            } catch (final ExecutionException | InterruptedException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered");
-                future.completeExceptionally(ex);
-            } catch (final TimeoutException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered within default timeout");
-                future.completeExceptionally(ex);
-            }
-        } else {
-            future.complete(null);
-        }
-        
-        return future;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
deleted file mode 100644
index ab8ef5c..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.Action;
-import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
-import org.atmosphere.cpr.AsyncIOWriter;
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.interceptor.AllowInterceptor;
-import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
-import org.atmosphere.util.Utils;
-
-import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
-import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
-
-/**
- * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
- * implementation does two things which do not fit well into SSE support:
- *  - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
- *  - wraps the whatever object is being written to SSE payload (overridden using
- *    the complete SSE protocol)
- */
-public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class);
-
-    private static final byte[] PADDING;
-    private static final String PADDING_TEXT;
-    private static final byte[] END = "\r\n\r\n".getBytes();
-
-    static {
-        StringBuffer whitespace = new StringBuffer();
-        for (int i = 0; i < 2000; i++) {
-            whitespace.append(" ");
-        }
-        whitespace.append("\n");
-        PADDING_TEXT = whitespace.toString();
-        PADDING = PADDING_TEXT.getBytes();
-    }
-
-    private boolean writePadding(AtmosphereResponse response) {
-        if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
-            return false;
-        }
-
-        response.setContentType(SERVER_SENT_EVENTS);
-        response.setCharacterEncoding("utf-8");
-        boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
-        if (isUsingStream) {
-            try {
-                OutputStream stream = response.getResponse().getOutputStream();
-                try {
-                    stream.write(PADDING);
-                    stream.flush();
-                } catch (IOException ex) {
-                    LOG.log(Level.WARNING, "SSE may not work", ex);
-                }
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        } else {
-            try {
-                PrintWriter w = response.getResponse().getWriter();
-                w.println(PADDING_TEXT);
-                w.flush();
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        }
-        response.resource().getRequest().setAttribute("paddingWritten", "true");
-        return true;
-    }
-
-    @Override
-    public Action inspect(final AtmosphereResource r) {
-        if (Utils.webSocketMessage(r)) {
-            return Action.CONTINUE;
-        }
-
-        final AtmosphereRequest request = r.getRequest();
-        final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
-
-        if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
-            final AtmosphereResponse response = r.getResponse();
-            if (response.getAsyncIOWriter() == null) {
-                response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
-            }
-
-            r.addEventListener(new P(response));
-
-            AsyncIOWriter writer = response.getAsyncIOWriter();
-            if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
-                AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
-                    private boolean padding() {
-                        if (!r.isSuspended()) {
-                            return writePadding(response);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        padding();
-                    }
-
-                    @Override
-                    public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
-                        // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
-                        if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
-                                || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
-                            response.write(END, true);
-                        }
-
-                        /**
-                         * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
-                         * resume after every message.
-                         */
-                        String ua = r.getRequest().getHeader("User-Agent");
-                        if (ua != null && ua.contains("MSIE")) {
-                            try {
-                                response.flushBuffer();
-                            } catch (IOException e) {
-                                LOG.log(Level.FINEST, "", e);
-                            }
-                            r.resume();
-                        }
-                    }
-                });
-            } else {
-                LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s",
-                    getClass().getName(), AtmosphereInterceptorWriter.class.getName()));
-            }
-        }
-
-        return Action.CONTINUE;
-    }
-
-    private final class P extends OnPreSuspend implements AllowInterceptor {
-
-        private final AtmosphereResponse response;
-
-        private P(AtmosphereResponse response) {
-            this.response = response;
-        }
-
-        @Override
-        public void onPreSuspend(AtmosphereResourceEvent event) {
-            writePadding(response);
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
deleted file mode 100644
index cfafe87..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereResponse;
-
-public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
-    @Override
-    public void close(AtmosphereResponse response) throws IOException {
-        // Do not close the response, keep output stream open
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index b0123d0..c6267d4 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -18,24 +18,15 @@
  */
 package org.apache.cxf.jaxrs.sse.ext;
 
-import org.apache.cxf.Bus;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
 import org.apache.cxf.jaxrs.sse.SseContextProvider;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
-import org.apache.cxf.transport.AbstractTransportFactory;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
 
 public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
     @Override
     public void customize(final JAXRSServerFactoryBean bean) {
-        bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
         bean.setProvider(new SseContextProvider());
-        bean.setProvider(new SseAtmosphereEventSinkContextProvider());
-        
-        final Bus bus = bean.getBus();
-        if (bus != null) {
-            bus.setProperty(AbstractTransportFactory.PREFERRED_TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
-        }
+        bean.setProvider(new SseEventSinkContextProvider());
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
deleted file mode 100644
index b1ca29d..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-import org.apache.cxf.transport.http.HttpDestinationFactory;
-import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
-
-public class SseDestinationFactory implements HttpDestinationFactory {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseDestinationFactory.class);
-    
-    @Override
-    public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, 
-            DestinationRegistry registry) throws IOException {
-        return new AtmosphereSseServletDestination(bus, getDestinationRegistry(bus), endpointInfo, 
-            endpointInfo.getAddress());
-    } 
-    
-    /**
-     * The destination factory should be taken from HTTP transport as a workaround to 
-     * register the destinations properly in the OSGi container.
-     */
-    private static DestinationRegistry getDestinationRegistry(Bus bus) {
-        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
-        
-        try {
-            DestinationFactory df = dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
-            if (df instanceof HTTPTransportFactory) {
-                HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
-                return transportFactory.getRegistry();
-            }
-        } catch (final Exception ex) {
-            LOG.warning("Unable to deduct the destination registry from HTTP transport: " + ex.getMessage());
-        }
-        
-        return null;
-    }
-
-
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
deleted file mode 100644
index a1f0d68..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-
-@NoJSR250Annotations
-public class SseHttpTransportFactory extends HTTPTransportFactory
-        implements ConduitInitiator, DestinationFactory {
-
-    public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
-    public static final List<String> DEFAULT_NAMESPACES = Collections.unmodifiableList(Arrays.asList(
-        TRANSPORT_ID,
-        "http://cxf.apache.org/transports/http/sse/configuration"
-    ));
-    
-    private final SseDestinationFactory factory = new SseDestinationFactory();
-
-    public SseHttpTransportFactory() {
-        this(null);
-    }
-
-    public SseHttpTransportFactory(DestinationRegistry registry) {
-        super(DEFAULT_NAMESPACES, registry);
-    }
-
-    @Override
-    public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
-        if (endpointInfo == null) {
-            throw new IllegalArgumentException("EndpointInfo cannot be null");
-        }
-        
-        // In order to register the destination in the OSGi container, we have to 
-        // include it into 2 registries basically: for HTTP transport and the current 
-        // one. The workaround is borrow from org.apache.cxf.transport.websocket.WebSocketTransportFactory,
-        // it seems like no better option exists at the moment.
-        synchronized (registry) {
-            AbstractHTTPDestination d = registry.getDestinationForPath(endpointInfo.getAddress());
-            
-            if (d == null) {
-                d = factory.createDestination(endpointInfo, bus, registry);
-                if (d == null) {
-                    throw new IOException("No destination available. The CXF SSE transport needs Atmosphere"
-                        + " dependencies to be available");
-                }
-                registry.addDestination(d);
-                configure(bus, d);
-                d.finalizeConfig();
-            }
-            
-            return d;
-        }
-    }
-}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
new file mode 100644
index 0000000..0277611
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.sse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusCreationListener;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+
+public class SseProvidersExtension implements BusCreationListener {
+
+    private static final String BUS_PROVIDERS = "org.apache.cxf.jaxrs.bus.providers";
+
+    @Override
+    public void busCreated(Bus bus) {
+        Object providers = bus.getProperty(BUS_PROVIDERS);
+        
+        final List<?> sseProviders = 
+            Arrays.asList(
+                new SseContextProvider(), 
+                new SseEventSinkContextProvider()
+            );
+        
+        if (providers instanceof List) {
+            final List<?> existing = new ArrayList<>((List<?>)providers);
+            existing.addAll(CastUtils.cast(sseProviders));
+            bus.setProperty(BUS_PROVIDERS, existing);
+        } else {
+            bus.setProperty(BUS_PROVIDERS, sseProviders);
+        }
+    }
+    
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
deleted file mode 100644
index add8afa..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.jaxrs.sse.SseFeature;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.Headers;
-import org.apache.cxf.transport.servlet.ServletDestination;
-import org.atmosphere.cache.UUIDBroadcasterCache;
-import org.atmosphere.cpr.ApplicationConfig;
-import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponseImpl;
-import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-
-public class AtmosphereSseServletDestination extends ServletDestination {
-    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
-
-    private AtmosphereFramework framework;
-
-    public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry,
-            EndpointInfo ei, String path) throws IOException {
-        super(bus, registry, ei, path);
-
-        framework = create();
-
-        bus.getFeatures().add(new SseFeature());
-    }
-
-    private AtmosphereFramework create() {
-        final AtmosphereFramework instance = new AtmosphereFramework(true, false);
-        
-        instance.interceptor(new SseAtmosphereInterceptor());
-        instance.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
-        instance.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
-        // Atmosphere does not limit amount of threads and application can crash in no time
-        // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "20");
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "20");
-        // workaround for atmosphere's jsr356 initialization requiring servletConfig
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true");
-
-        instance.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName());
-        instance.addAtmosphereHandler("/", new DestinationHandler());
-        
-        return instance;
-    }
-    
-    @Override
-    public void onServletConfigAvailable(ServletConfig config) throws ServletException {
-        // Very likely there is JSR-356 implementation available, let us reconfigure the Atmosphere framework
-        // to use it since ServletConfig instance is already available.
-        final Object container = config.getServletContext()
-            .getAttribute("javax.websocket.server.ServerContainer");
-
-        if (container != null) {
-            if (framework.initialized()) {
-                framework.destroy();
-            }
-            
-            framework = create();
-            framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "false");
-            framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "false");
-            
-            framework.init(config);
-        }
-    }
-    
-    @Override
-    public void finalizeConfig() {
-        if (framework.initialized()) {
-            return;
-        }
-        
-        final ServletContext ctx = bus.getExtension(ServletContext.class);
-        if (ctx != null) {
-            try {
-                framework.init(new ServletConfig() {
-                    @Override
-                    public String getServletName() {
-                        return null;
-                    }
-                    @Override
-                    public ServletContext getServletContext() {
-                        return ctx;
-                    }
-                    @Override
-                    public String getInitParameter(String name) {
-                        return null;
-                    }
-
-                    @Override
-                    public Enumeration<String> getInitParameterNames() {
-                        return null;
-                    }
-                });
-            } catch (ServletException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        } else {
-            framework.init();
-        }
-    }
-
-    @Override
-    public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
-            HttpServletResponse resp) throws IOException {
-        try {
-            framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
-        } catch (ServletException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            framework.destroy();
-        } catch (Exception ex) {
-            LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
-        } finally {
-            super.shutdown();
-        }
-    }
-
-    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
-        @Override
-        public void onRequest(final AtmosphereResource resource) throws IOException {
-            LOG.fine("onRequest");
-            try {
-                AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
-                    resource.getRequest(), resource.getResponse());
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Failed to invoke service", e);
-            }
-        }
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage, getStream);
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage);
-    }
-
-    /**
-     * It has been noticed that Jetty checks the "Content-Length" header and completes the
-     * response if its value is 0 (or matches the number of bytes written). However, in case
-     * of SSE the content length is unknown so we are setting it to -1 before flushing the
-     * response. Otherwise, only the first event is going to be sent and response is going to
-     * be closed.
-     */
-    private void adjustContentLength(Message outMessage) {
-        final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
-
-        if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
-            final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
-            headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
index 643b51c..97a6b9f 100644
--- a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -1 +1 @@
-org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file
+org.apache.cxf.transport.sse.SseProvidersExtension::true
\ No newline at end of file
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index ee2fc13..73e3d0b 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -537,17 +537,14 @@
            <scope>test</scope>
         </dependency>
         <dependency>
-            <!-- activate atmosphere. Its use can be disabled by setting
-                 org.apache.cxf.transport.websocket.atmosphere.disabled to "true" -->
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-            <version>${cxf.atmosphere.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.qi4j.library</groupId>
             <artifactId>org.qi4j.library.circuitbreaker</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/systests/rs-sse/rs-sse-base/pom.xml b/systests/rs-sse/rs-sse-base/pom.xml
index 04584c2..260f609 100644
--- a/systests/rs-sse/rs-sse-base/pom.xml
+++ b/systests/rs-sse/rs-sse-base/pom.xml
@@ -49,5 +49,9 @@
             <artifactId>cxf-testutils</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 5c8bf99..1e32b4b 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -177,6 +177,29 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
         r.close();
     }
 
+    @Test
+    public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/nodelay/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+        
+        try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(5000, books, 5);
+        }
+        // Easing the test verification here, it does not work well for Atm + Jetty
+        assertThat(books, 
+            hasItems(
+                new Book("New Book #1", 1), 
+                new Book("New Book #2", 2), 
+                new Book("New Book #3", 3), 
+                new Book("New Book #4", 4),
+                new Book("New Book #5", 5)
+            )
+        );
+    }
+
     private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
         return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
     }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index ec6b1c0..ad2387f 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -20,6 +20,7 @@ package org.apache.cxf.systest.jaxrs.sse;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -92,6 +93,23 @@ public class BookStore {
             }
         }.start();
     }
+    
+    @GET
+    @Path("nodelay/sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createStatsEvent(builder.name("book"), 1));
+                sink.send(createStatsEvent(builder.name("book"), 2));
+                sink.send(createStatsEvent(builder.name("book"), 3));
+                sink.send(createStatsEvent(builder.name("book"), 4));
+                sink.send(createStatsEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
 
     @GET
     @Path("nodata")
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index dc76876..f78c5ce 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -20,6 +20,7 @@ package org.apache.cxf.systest.jaxrs.sse;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -93,6 +94,23 @@ public class BookStore2 {
     }
 
     @GET
+    @Path("nodelay/sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createStatsEvent(builder.name("book"), 1));
+                sink.send(createStatsEvent(builder.name("book"), 2));
+                sink.send(createStatsEvent(builder.name("book"), 3));
+                sink.send(createStatsEvent(builder.name("book"), 4));
+                sink.send(createStatsEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
+
+    @GET
     @Path("broadcast/sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void broadcast(@Context SseEventSink sink) {
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
index a47c0c3..89deec5 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
+++ b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -57,7 +56,6 @@ public abstract class AbstractJettyServer extends AbstractBusTestServerBase {
             if (resourcePath == null) {
                 // Register and map the dispatcher servlet
                 final ServletHolder holder = new ServletHolder(new CXFNonSpringJaxrsServlet());
-                holder.setInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
                 holder.setInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 holder.setInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 final ServletContextHandler context = new ServletContextHandler();
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb..8133ebf 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -8,10 +8,6 @@
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
 		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
-		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
 		</init-param>
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
index ded2a64..799a134 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
@@ -30,7 +30,6 @@ import org.apache.catalina.startup.Tomcat;
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
 
@@ -60,8 +59,6 @@ public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
             if (resourcePath == null) {
                 final Context context = server.addContext("/", base.getAbsolutePath());
                 final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFNonSpringJaxrsServlet());
-                cxfServlet.addInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID,
-                    SseHttpTransportFactory.TRANSPORT_ID);
                 cxfServlet.addInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 cxfServlet.addInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 cxfServlet.setAsyncSupported(true);
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb..8133ebf 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -8,10 +8,6 @@
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
 		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
-		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
 		</init-param>
diff --git a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
index c6f0a8d..c24dfef 100644
--- a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
+++ b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 import io.undertow.Handlers;
 import io.undertow.Undertow;
@@ -54,7 +53,6 @@ public abstract class AbstractUndertowServer extends AbstractBusTestServerBase {
                 .setDeploymentName("sse-test")
                 .addServlets(
                     servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
-                        .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
                         .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
                         .addInitParam("jaxrs.serviceClasses", BookStore.class.getName())
                         .setAsyncSupported(true)

-- 
To stop receiving notification emails like this one, please contact
reta@apache.org.