You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2018/05/19 16:38:56 UTC

[GitHub] reta closed pull request #415: Reimplementing SSE using AsyncContext (Servlet 3.0+)

reta closed pull request #415: Reimplementing SSE using AsyncContext (Servlet 3.0+)
URL: https://github.com/apache/cxf/pull/415
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
index 44564aa9d7d..16eacbe1071 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
@@ -88,11 +88,6 @@
             <artifactId>websocket-server</artifactId>
             <version>${cxf.jetty9.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
         
         <dependency>
             <groupId>io.reactivex.rxjava2</groupId>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
index f13318db43c..dde5496211d 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
 package demo.jaxrs.sse;
 
 import org.apache.cxf.cdi.CXFCdiServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -44,7 +43,6 @@ public static void main(final String[] args) throws Exception {
          // Register and map the dispatcher servlet
         final CXFCdiServlet cxfServlet = new CXFCdiServlet();
         final ServletHolder cxfServletHolder = new ServletHolder(cxfServlet);
-        cxfServletHolder.setInitParameter(CXFCdiServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         final ServletContextHandler context = new ServletContextHandler();
         context.setContextPath("/");
         context.addEventListener(new Listener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
index d8301dc79b6..5a9fe3ff6f5 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/pom.xml
@@ -53,11 +53,6 @@
             <groupId>io.undertow</groupId>
             <artifactId>undertow-servlet</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
     
     <profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
index 77d847e9810..9f0c38fe323 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_client/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -21,7 +21,6 @@
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 import io.undertow.Handlers;
 import io.undertow.Undertow;
@@ -44,7 +43,6 @@ public static void main(final String[] args) throws Exception {
             .setDeploymentName("sse-demo")
             .addServlets(
                 servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
-                    .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
                     .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
                     .addInitParam("jaxrs.serviceClasses", StatsRestServiceImpl.class.getName())
                     .setAsyncSupported(true)
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
index d77b806975d..fdfedfe5a2b 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/pom.xml
@@ -33,7 +33,6 @@
     <properties>
         <cxf.version>${project.version}</cxf.version>
         <!-- TODO remove these local entries after making the referenced dependency managed in parent/pom.xml -->
-        <cxf.atmosphere.version>2.4.3</cxf.atmosphere.version>
         <cxf.jetty92.version>9.2.15.v20160210</cxf.jetty92.version>
         <cxf.jetty93.version>9.3.5.v20151012</cxf.jetty93.version>
         <cxf.jetty.version>${cxf.jetty93.version}</cxf.jetty.version>
@@ -93,10 +92,5 @@
             <groupId>com.fasterxml.jackson.jaxrs</groupId>
             <artifactId>jackson-jaxrs-json-provider</artifactId>
         </dependency>
-        <!-- add atmosphere -->
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
 </project>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
index c53b4ebfe3f..36d885f8c0e 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_osgi/src/main/resources/OSGI-INF/blueprint/context.xml
@@ -31,6 +31,7 @@
     <!-- Application resources -->
     <bean id="statsResource" class="demo.jaxrs.server.StatsRestServiceImpl" />
     <bean id="jsonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider" />
+    <bean id="sseFeature" class="org.apache.cxf.jaxrs.sse.SseFeature" />
  
     <cxf:bus>
         <cxf:features>
@@ -38,14 +39,16 @@
         </cxf:features>
     </cxf:bus>
 
-    <jaxrs:server id="sseSampleService" address="/" transportId="http://cxf.apache.org/transports/http/sse">
+    <jaxrs:server id="sseSampleService" address="/">
         <jaxrs:serviceBeans>
             <ref component-id="statsResource" />
         </jaxrs:serviceBeans>
         <jaxrs:providers>
-            <ref component-id="corsFilter" />
             <ref component-id="jsonProvider" />
         </jaxrs:providers>
+        <jaxrs:features>
+            <ref component-id="sseFeature" />
+        </jaxrs:features>
     </jaxrs:server>
 
 </blueprint>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
index 338005079c6..c327a08520b 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsConfig.java
@@ -26,7 +26,6 @@
 import org.apache.cxf.bus.spring.SpringBus;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
@@ -50,7 +49,6 @@ Server jaxRsServer() {
             .createEndpoint(new StatsApplication(), JAXRSServerFactoryBean.class);
         factory.setServiceBean(statsRestService);
         factory.setProvider(new JacksonJsonProvider());
-        factory.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
         return factory.create();
     }
 }
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
index 48960457136..9e23afc2032 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -19,7 +19,6 @@
 package demo.jaxrs.sse;
 
 import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -43,7 +42,6 @@ public static void main(final String[] args) throws Exception {
 
          // Register and map the dispatcher servlet
         final ServletHolder cxfServletHolder = new ServletHolder(new CXFServlet());
-        cxfServletHolder.setInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         final ServletContextHandler context = new ServletContextHandler();
         context.setContextPath("/");
         context.addEventListener(new ContextLoaderListener());
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
index d732846b839..c5ca9d0c465 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/pom.xml
@@ -90,11 +90,6 @@
             <artifactId>tomcat-embed-websocket</artifactId>
             <version>${cxf.tomcat.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
     </dependencies>
     
     <profiles>
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
index 6adad0b8851..b98af99ca04 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsServer.java
@@ -29,7 +29,6 @@
 import org.apache.catalina.webresources.DirResourceSet;
 import org.apache.catalina.webresources.StandardRoot;
 import org.apache.cxf.transport.servlet.CXFServlet;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.springframework.web.context.ContextLoaderListener;
 
 public final class StatsServer {
@@ -51,7 +50,6 @@ public static void main(final String[] args) throws Exception {
         context.setResources(resourcesFrom(context, "target/classes"));
 
         final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFServlet());
-        cxfServlet.addInitParameter(CXFServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
         cxfServlet.setAsyncSupported(true);
         context.addServletMapping("/rest/*", "cxfServlet");
 
diff --git a/osgi/karaf/features/src/main/resources/features.xml b/osgi/karaf/features/src/main/resources/features.xml
index 87fe948c608..973a7907073 100644
--- a/osgi/karaf/features/src/main/resources/features.xml
+++ b/osgi/karaf/features/src/main/resources/features.xml
@@ -405,7 +405,6 @@
     <feature name="cxf-sse" version="${project.version}">
         <feature version="${project.version}">cxf-http</feature>
         <feature version="${project.version}">cxf-jaxrs</feature>
-        <bundle dependency='true'>mvn:org.atmosphere/atmosphere-runtime/${cxf.atmosphere.version}</bundle>
         <bundle start-level="40">mvn:org.apache.cxf/cxf-rt-rs-sse/${project.version}</bundle>
         <capability>
             cxf.http.provider;name=sse
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
index 3da5e54538d..eb3a8d2803c 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
@@ -261,7 +261,7 @@ public void onDisconnect() {
     }
 
     public synchronized boolean suspendContinuationIfNeeded() {
-        if (!resumedByApplication && !cont.isPending() && !cont.isResumed()) {
+        if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) {
             cont.suspend(AsyncResponse.NO_TIMEOUT);
             initialSuspend = false;
             return true;
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index 3da405f4938..58ae3af61cf 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -57,10 +57,6 @@
             <artifactId>${cxf.servlet-api.artifact}</artifactId>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
index a97020f8e8a..28ac1a4a447 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -112,6 +112,8 @@ public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
             writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
             os.write(NEW_LINE);
         }
+
+        os.write(NEW_LINE);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 38842540822..7a96bc7060c 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,37 +18,67 @@
  */
 package org.apache.cxf.jaxrs.sse;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
 import javax.ws.rs.sse.SseEventSink;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
     private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
-
-    private final Set<Consumer<SseEventSink>> closers =
-            new CopyOnWriteArraySet<>();
-
-    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
-            new CopyOnWriteArraySet<>();
+    private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     @Override
     public void register(SseEventSink sink) {
+        assertNotClosed();
+
+        final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+        final AsyncContext ctx = sinkImpl.getAsyncContext();
+
+        ctx.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
+
+            }
+        });
+
         subscribers.add(sink);
     }
 
     @Override
     public CompletionStage<?> broadcast(OutboundSseEvent event) {
+        assertNotClosed();
+
         final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-        
         for (SseEventSink sink: subscribers) {
             try {
                 futures.add(sink.send(event).toCompletableFuture());
@@ -62,19 +92,29 @@ public void register(SseEventSink sink) {
 
     @Override
     public void onClose(Consumer<SseEventSink> subscriber) {
+        assertNotClosed();
         closers.add(subscriber);
     }
 
     @Override
     public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+        assertNotClosed();
         exceptioners.add(exceptioner);
     }
 
     @Override
     public void close() {
-        subscribers.forEach(subscriber -> {
-            subscriber.close();
-            closers.forEach(closer -> closer.accept(subscriber));
-        });
+        if (closed.compareAndSet(false, true)) {
+            subscribers.forEach(subscriber -> {
+                subscriber.close();
+                closers.forEach(closer -> closer.accept(subscriber));
+            });
+        }
+    }
+
+    private void assertNotClosed() {
+        if (closed.get()) {
+            throw new IllegalStateException("The SSE broadcaster is already closed");
+        }
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
similarity index 64%
rename from rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
rename to rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index 8d87a3dbefb..67e9c54c2e8 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -16,22 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.sse.atmosphere;
+
+
+package org.apache.cxf.jaxrs.sse;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
 
 import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
 
-public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+
     @Override
     public SseEventSink createContext(Message message) {
         final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
@@ -39,23 +41,10 @@ public SseEventSink createContext(Message message) {
             throw new IllegalStateException("Unable to retrieve HTTP request from the context");
         }
 
-        final AtmosphereResource resource = (AtmosphereResource)request
-            .getAttribute(AtmosphereResource.class.getName());
-        if (resource == null) {
-            throw new IllegalStateException("AtmosphereResource is not present, "
-                    + "is AtmosphereServlet configured properly?");
-        }
-
-        final Broadcaster broadcaster = resource.getAtmosphereConfig()
-            .getBroadcasterFactory()
-            .lookup(resource.uuid(), true);
-
-        resource.removeFromAllBroadcasters();
-        resource.setBroadcaster(broadcaster);
-
         final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
             ServerProviderFactory.getInstance(message), message.getExchange());
 
-        return new SseAtmosphereEventSinkImpl(writer, resource);
+        final AsyncResponse async = new AsyncResponseImpl(message);
+        return new SseEventSinkImpl(writer, async, request.getAsyncContext());
     }
-}
+}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
new file mode 100644
index 00000000000..f39de07bfd3
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+    private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
+    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
+    private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
+
+    private final AsyncContext ctx;
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    private final Queue<QueuedEvent> buffer;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean dispatching = new AtomicBoolean(false);
+
+    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
+            final AsyncResponse async, final AsyncContext ctx) {
+        
+        this.writer = writer;
+        this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
+        this.ctx = ctx;
+
+        if (ctx == null) {
+            throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
+                + "Is the Servlet configured properly?");
+        }
+
+        ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+    }
+
+    public AsyncContext getAsyncContext() {
+        return ctx;
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            // In case we are still dispatching, give the events the chance to be
+            // sent over to the consumers. The good example would be sent(event) call,
+            // immediately followed by the close() call.
+            if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+                LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+            }
+            
+            try {
+                ctx.complete();
+            } catch (final IllegalStateException ex) {
+                LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+            }
+        }
+    }
+
+    private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+        final long parkTime = unit.toNanos(timeout) / 20;
+        int attempt = 0;
+        
+        while (dispatching.get() && ++attempt < 20) {
+            LockSupport.parkNanos(parkTime);
+        }
+        
+        return buffer.isEmpty();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
+    @Override
+    public CompletionStage<?> send(OutboundSseEvent event) {
+        final CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (!closed.get() && writer != null) {
+            if (buffer.offer(new QueuedEvent(event, future))) {
+                if (dispatching.compareAndSet(false, true)) {
+                    ctx.start(this::dequeue);
+                }
+            } else {
+                future.completeExceptionally(new IllegalStateException(
+                    "The buffer is full (10000), unable to queue SSE event for send"));
+            }
+        } else {
+            future.completeExceptionally(new IllegalStateException(
+                "The sink is already closed, unable to queue SSE event for send"));
+        }
+
+        return future;
+    }
+
+    private void dequeue() {
+        try {
+            while (true) {
+                final QueuedEvent qeuedEvent = buffer.poll();
+                
+                // Nothing queued, release the thread
+                if (qeuedEvent == null) {
+                    break;
+                }
+                
+                final OutboundSseEvent event = qeuedEvent.event;
+                final CompletableFuture<?> future = qeuedEvent.completion;
+    
+                try {
+                    writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+                        event.getMediaType(), null, ctx.getResponse().getOutputStream());
+                    ctx.getResponse().flushBuffer();
+                    future.complete(null);
+                } catch (final Exception ex) {
+                    future.completeExceptionally(ex);
+                }
+            }
+        } finally {
+            dispatching.set(false);
+        }
+    }
+
+    private static class QueuedEvent {
+        private final OutboundSseEvent event;
+        private final CompletableFuture<?> completion;
+
+        QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+            this.event = event;
+            this.completion = completion;
+        }
+    }
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index 36ea9ede1f1..9ecbe27c64e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -28,7 +28,6 @@
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
 
 @Provider(value = Type.Feature, scope = Scope.Server)
 public class SseFeature extends AbstractFeature {
@@ -36,8 +35,8 @@
     public void initialize(Server server, Bus bus) {
         final List<Object> providers = new ArrayList<>();
 
-        providers.add(new SseAtmosphereEventSinkContextProvider());
         providers.add(new SseContextProvider());
+        providers.add(new SseEventSinkContextProvider());
 
         ((ServerProviderFactory) server.getEndpoint().get(
             ServerProviderFactory.class.getName())).setUserProviders(providers);
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
deleted file mode 100644
index af984e79daf..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-
-public class SseAtmosphereEventSinkImpl implements SseEventSink {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
-
-    private final AtmosphereResource resource;
-    private final MessageBodyWriter<OutboundSseEvent> writer;
-    private final boolean usingStream;
-    
-    private volatile boolean closed;
-
-    public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
-            final AtmosphereResource resource) {
-        this.writer = writer;
-        this.resource = resource;
-        this.usingStream = (Boolean)resource.getRequest().getAttribute(PROPERTY_USE_STREAM);
-
-        if (!resource.isSuspended()) {
-            LOG.fine("Atmosphere resource is not suspended, suspending");
-            resource.suspend();
-        }
-    }
-
-    @Override
-    public void close() {
-        if (!closed) {
-            closed = true;
-
-            LOG.fine("Closing Atmosphere SSE event output");
-            if (resource.isSuspended()) {
-                LOG.fine("Atmosphere resource is suspended, resuming");
-                resource.resume();
-            }
-
-            final Broadcaster broadcaster = resource.getBroadcaster();
-            resource.removeFromAllBroadcasters();
-
-            try {
-                final AtmosphereResponse response = resource.getResponse();
-                
-                try {
-                    // The resource's request property is null here, we have to
-                    // rely on initial request to understand what to close, response
-                    // or stream.
-                    if (usingStream) {
-                        response.getOutputStream().close();
-                    } else {
-                        response.getWriter().close();
-                    }                    
-                } catch (final IOException ex) {
-                    LOG.warning("Failed to flush AtmosphereResponse buffer: "
-                        + ex.getMessage());
-                }
-            } finally {
-                try {
-                    resource.close();
-                } catch (IOException ex) {
-                    // ignore
-                }
-                broadcaster.destroy();
-                LOG.fine("Atmosphere SSE event output is closed");
-            }
-        }
-    }
-
-    @Override
-    public CompletionStage<?> send(OutboundSseEvent event) {
-        final CompletableFuture<?> future = new CompletableFuture<>();
-        
-        if (!closed && writer != null) {
-            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-
-                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
-                // Unfortunately, calling close() may lead to response stream being closed
-                // while there are still some SSE delivery scheduled.
-                return CompletableFuture.completedFuture(
-                    resource
-                        .getBroadcaster()
-                        .broadcast(os.toString(StandardCharsets.UTF_8.name()))
-                        .get(1, TimeUnit.SECONDS)
-                    );
-            } catch (final IOException ex) {
-                LOG.warning("While writing the SSE event, an exception was raised: " + ex);
-                future.completeExceptionally(ex);
-            } catch (final ExecutionException | InterruptedException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered");
-                future.completeExceptionally(ex);
-            } catch (final TimeoutException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered within default timeout");
-                future.completeExceptionally(ex);
-            }
-        } else {
-            future.complete(null);
-        }
-        
-        return future;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
deleted file mode 100644
index ab8ef5c5ee8..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.Action;
-import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
-import org.atmosphere.cpr.AsyncIOWriter;
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.interceptor.AllowInterceptor;
-import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
-import org.atmosphere.util.Utils;
-
-import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
-import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
-
-/**
- * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
- * implementation does two things which do not fit well into SSE support:
- *  - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
- *  - wraps the whatever object is being written to SSE payload (overridden using
- *    the complete SSE protocol)
- */
-public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class);
-
-    private static final byte[] PADDING;
-    private static final String PADDING_TEXT;
-    private static final byte[] END = "\r\n\r\n".getBytes();
-
-    static {
-        StringBuffer whitespace = new StringBuffer();
-        for (int i = 0; i < 2000; i++) {
-            whitespace.append(" ");
-        }
-        whitespace.append("\n");
-        PADDING_TEXT = whitespace.toString();
-        PADDING = PADDING_TEXT.getBytes();
-    }
-
-    private boolean writePadding(AtmosphereResponse response) {
-        if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
-            return false;
-        }
-
-        response.setContentType(SERVER_SENT_EVENTS);
-        response.setCharacterEncoding("utf-8");
-        boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
-        if (isUsingStream) {
-            try {
-                OutputStream stream = response.getResponse().getOutputStream();
-                try {
-                    stream.write(PADDING);
-                    stream.flush();
-                } catch (IOException ex) {
-                    LOG.log(Level.WARNING, "SSE may not work", ex);
-                }
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        } else {
-            try {
-                PrintWriter w = response.getResponse().getWriter();
-                w.println(PADDING_TEXT);
-                w.flush();
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        }
-        response.resource().getRequest().setAttribute("paddingWritten", "true");
-        return true;
-    }
-
-    @Override
-    public Action inspect(final AtmosphereResource r) {
-        if (Utils.webSocketMessage(r)) {
-            return Action.CONTINUE;
-        }
-
-        final AtmosphereRequest request = r.getRequest();
-        final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
-
-        if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
-            final AtmosphereResponse response = r.getResponse();
-            if (response.getAsyncIOWriter() == null) {
-                response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
-            }
-
-            r.addEventListener(new P(response));
-
-            AsyncIOWriter writer = response.getAsyncIOWriter();
-            if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
-                AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
-                    private boolean padding() {
-                        if (!r.isSuspended()) {
-                            return writePadding(response);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        padding();
-                    }
-
-                    @Override
-                    public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
-                        // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
-                        if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
-                                || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
-                            response.write(END, true);
-                        }
-
-                        /**
-                         * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
-                         * resume after every message.
-                         */
-                        String ua = r.getRequest().getHeader("User-Agent");
-                        if (ua != null && ua.contains("MSIE")) {
-                            try {
-                                response.flushBuffer();
-                            } catch (IOException e) {
-                                LOG.log(Level.FINEST, "", e);
-                            }
-                            r.resume();
-                        }
-                    }
-                });
-            } else {
-                LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s",
-                    getClass().getName(), AtmosphereInterceptorWriter.class.getName()));
-            }
-        }
-
-        return Action.CONTINUE;
-    }
-
-    private final class P extends OnPreSuspend implements AllowInterceptor {
-
-        private final AtmosphereResponse response;
-
-        private P(AtmosphereResponse response) {
-            this.response = response;
-        }
-
-        @Override
-        public void onPreSuspend(AtmosphereResourceEvent event) {
-            writePadding(response);
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
deleted file mode 100644
index cfafe870c3c..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereResponse;
-
-public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
-    @Override
-    public void close(AtmosphereResponse response) throws IOException {
-        // Do not close the response, keep output stream open
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index b0123d03ca3..c6267d49aae 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -18,24 +18,15 @@
  */
 package org.apache.cxf.jaxrs.sse.ext;
 
-import org.apache.cxf.Bus;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
 import org.apache.cxf.jaxrs.sse.SseContextProvider;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
-import org.apache.cxf.transport.AbstractTransportFactory;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
 
 public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
     @Override
     public void customize(final JAXRSServerFactoryBean bean) {
-        bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
         bean.setProvider(new SseContextProvider());
-        bean.setProvider(new SseAtmosphereEventSinkContextProvider());
-        
-        final Bus bus = bean.getBus();
-        if (bus != null) {
-            bus.setProperty(AbstractTransportFactory.PREFERRED_TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
-        }
+        bean.setProvider(new SseEventSinkContextProvider());
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
deleted file mode 100644
index b1ca29d32d8..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-import org.apache.cxf.transport.http.HttpDestinationFactory;
-import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
-
-public class SseDestinationFactory implements HttpDestinationFactory {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseDestinationFactory.class);
-    
-    @Override
-    public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, 
-            DestinationRegistry registry) throws IOException {
-        return new AtmosphereSseServletDestination(bus, getDestinationRegistry(bus), endpointInfo, 
-            endpointInfo.getAddress());
-    } 
-    
-    /**
-     * The destination factory should be taken from HTTP transport as a workaround to 
-     * register the destinations properly in the OSGi container.
-     */
-    private static DestinationRegistry getDestinationRegistry(Bus bus) {
-        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
-        
-        try {
-            DestinationFactory df = dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
-            if (df instanceof HTTPTransportFactory) {
-                HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
-                return transportFactory.getRegistry();
-            }
-        } catch (final Exception ex) {
-            LOG.warning("Unable to deduct the destination registry from HTTP transport: " + ex.getMessage());
-        }
-        
-        return null;
-    }
-
-
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
deleted file mode 100644
index a1f0d68ab0c..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-
-@NoJSR250Annotations
-public class SseHttpTransportFactory extends HTTPTransportFactory
-        implements ConduitInitiator, DestinationFactory {
-
-    public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
-    public static final List<String> DEFAULT_NAMESPACES = Collections.unmodifiableList(Arrays.asList(
-        TRANSPORT_ID,
-        "http://cxf.apache.org/transports/http/sse/configuration"
-    ));
-    
-    private final SseDestinationFactory factory = new SseDestinationFactory();
-
-    public SseHttpTransportFactory() {
-        this(null);
-    }
-
-    public SseHttpTransportFactory(DestinationRegistry registry) {
-        super(DEFAULT_NAMESPACES, registry);
-    }
-
-    @Override
-    public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
-        if (endpointInfo == null) {
-            throw new IllegalArgumentException("EndpointInfo cannot be null");
-        }
-        
-        // In order to register the destination in the OSGi container, we have to 
-        // include it into 2 registries basically: for HTTP transport and the current 
-        // one. The workaround is borrow from org.apache.cxf.transport.websocket.WebSocketTransportFactory,
-        // it seems like no better option exists at the moment.
-        synchronized (registry) {
-            AbstractHTTPDestination d = registry.getDestinationForPath(endpointInfo.getAddress());
-            
-            if (d == null) {
-                d = factory.createDestination(endpointInfo, bus, registry);
-                if (d == null) {
-                    throw new IOException("No destination available. The CXF SSE transport needs Atmosphere"
-                        + " dependencies to be available");
-                }
-                registry.addDestination(d);
-                configure(bus, d);
-                d.finalizeConfig();
-            }
-            
-            return d;
-        }
-    }
-}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
new file mode 100644
index 00000000000..0277611ff59
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.sse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusCreationListener;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+
+public class SseProvidersExtension implements BusCreationListener {
+
+    private static final String BUS_PROVIDERS = "org.apache.cxf.jaxrs.bus.providers";
+
+    @Override
+    public void busCreated(Bus bus) {
+        Object providers = bus.getProperty(BUS_PROVIDERS);
+        
+        final List<?> sseProviders = 
+            Arrays.asList(
+                new SseContextProvider(), 
+                new SseEventSinkContextProvider()
+            );
+        
+        if (providers instanceof List) {
+            final List<?> existing = new ArrayList<>((List<?>)providers);
+            existing.addAll(CastUtils.cast(sseProviders));
+            bus.setProperty(BUS_PROVIDERS, existing);
+        } else {
+            bus.setProperty(BUS_PROVIDERS, sseProviders);
+        }
+    }
+    
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
deleted file mode 100644
index add8afa08de..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.jaxrs.sse.SseFeature;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.Headers;
-import org.apache.cxf.transport.servlet.ServletDestination;
-import org.atmosphere.cache.UUIDBroadcasterCache;
-import org.atmosphere.cpr.ApplicationConfig;
-import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponseImpl;
-import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-
-public class AtmosphereSseServletDestination extends ServletDestination {
-    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
-
-    private AtmosphereFramework framework;
-
-    public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry,
-            EndpointInfo ei, String path) throws IOException {
-        super(bus, registry, ei, path);
-
-        framework = create();
-
-        bus.getFeatures().add(new SseFeature());
-    }
-
-    private AtmosphereFramework create() {
-        final AtmosphereFramework instance = new AtmosphereFramework(true, false);
-        
-        instance.interceptor(new SseAtmosphereInterceptor());
-        instance.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
-        instance.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
-        // Atmosphere does not limit amount of threads and application can crash in no time
-        // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "20");
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "20");
-        // workaround for atmosphere's jsr356 initialization requiring servletConfig
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true");
-
-        instance.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName());
-        instance.addAtmosphereHandler("/", new DestinationHandler());
-        
-        return instance;
-    }
-    
-    @Override
-    public void onServletConfigAvailable(ServletConfig config) throws ServletException {
-        // Very likely there is JSR-356 implementation available, let us reconfigure the Atmosphere framework
-        // to use it since ServletConfig instance is already available.
-        final Object container = config.getServletContext()
-            .getAttribute("javax.websocket.server.ServerContainer");
-
-        if (container != null) {
-            if (framework.initialized()) {
-                framework.destroy();
-            }
-            
-            framework = create();
-            framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "false");
-            framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "false");
-            
-            framework.init(config);
-        }
-    }
-    
-    @Override
-    public void finalizeConfig() {
-        if (framework.initialized()) {
-            return;
-        }
-        
-        final ServletContext ctx = bus.getExtension(ServletContext.class);
-        if (ctx != null) {
-            try {
-                framework.init(new ServletConfig() {
-                    @Override
-                    public String getServletName() {
-                        return null;
-                    }
-                    @Override
-                    public ServletContext getServletContext() {
-                        return ctx;
-                    }
-                    @Override
-                    public String getInitParameter(String name) {
-                        return null;
-                    }
-
-                    @Override
-                    public Enumeration<String> getInitParameterNames() {
-                        return null;
-                    }
-                });
-            } catch (ServletException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        } else {
-            framework.init();
-        }
-    }
-
-    @Override
-    public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
-            HttpServletResponse resp) throws IOException {
-        try {
-            framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
-        } catch (ServletException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            framework.destroy();
-        } catch (Exception ex) {
-            LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
-        } finally {
-            super.shutdown();
-        }
-    }
-
-    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
-        @Override
-        public void onRequest(final AtmosphereResource resource) throws IOException {
-            LOG.fine("onRequest");
-            try {
-                AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
-                    resource.getRequest(), resource.getResponse());
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Failed to invoke service", e);
-            }
-        }
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage, getStream);
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage);
-    }
-
-    /**
-     * It has been noticed that Jetty checks the "Content-Length" header and completes the
-     * response if its value is 0 (or matches the number of bytes written). However, in case
-     * of SSE the content length is unknown so we are setting it to -1 before flushing the
-     * response. Otherwise, only the first event is going to be sent and response is going to
-     * be closed.
-     */
-    private void adjustContentLength(Message outMessage) {
-        final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
-
-        if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
-            final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
-            headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
index 643b51c04e9..97a6b9f38b5 100644
--- a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -1 +1 @@
-org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file
+org.apache.cxf.transport.sse.SseProvidersExtension::true
\ No newline at end of file
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index ee2fc135dcd..73e3d0b4258 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -536,18 +536,15 @@
            <artifactId>snakeyaml</artifactId>
            <scope>test</scope>
         </dependency>
-        <dependency>
-            <!-- activate atmosphere. Its use can be disabled by setting
-                 org.apache.cxf.transport.websocket.atmosphere.disabled to "true" -->
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-            <version>${cxf.atmosphere.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.qi4j.library</groupId>
             <artifactId>org.qi4j.library.circuitbreaker</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/systests/rs-sse/rs-sse-base/pom.xml b/systests/rs-sse/rs-sse-base/pom.xml
index 04584c24ca8..260f609b23f 100644
--- a/systests/rs-sse/rs-sse-base/pom.xml
+++ b/systests/rs-sse/rs-sse-base/pom.xml
@@ -49,5 +49,9 @@
             <artifactId>cxf-testutils</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 5c8bf99c29c..1e32b4bef07 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -177,6 +177,29 @@ public void testBooksAreReturned() throws JsonProcessingException {
         r.close();
     }
 
+    @Test
+    public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/nodelay/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+        
+        try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(5000, books, 5);
+        }
+        // Easing the test verification here, it does not work well for Atm + Jetty
+        assertThat(books, 
+            hasItems(
+                new Book("New Book #1", 1), 
+                new Book("New Book #2", 2), 
+                new Book("New Book #3", 3), 
+                new Book("New Book #4", 4),
+                new Book("New Book #5", 5)
+            )
+        );
+    }
+
     private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
         return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
     }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index ec6b1c0dc9e..ad2387f2c7a 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -20,6 +20,7 @@
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -92,6 +93,23 @@ public void run() {
             }
         }.start();
     }
+    
+    @GET
+    @Path("nodelay/sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createStatsEvent(builder.name("book"), 1));
+                sink.send(createStatsEvent(builder.name("book"), 2));
+                sink.send(createStatsEvent(builder.name("book"), 3));
+                sink.send(createStatsEvent(builder.name("book"), 4));
+                sink.send(createStatsEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
 
     @GET
     @Path("nodata")
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index dc7687642aa..f78c5ceac17 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -20,6 +20,7 @@
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -92,6 +93,23 @@ public void run() {
         }.start();
     }
 
+    @GET
+    @Path("nodelay/sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createStatsEvent(builder.name("book"), 1));
+                sink.send(createStatsEvent(builder.name("book"), 2));
+                sink.send(createStatsEvent(builder.name("book"), 3));
+                sink.send(createStatsEvent(builder.name("book"), 4));
+                sink.send(createStatsEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
+
     @GET
     @Path("broadcast/sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
index a47c0c3018b..89deec5a26b 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
+++ b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
@@ -24,7 +24,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -57,7 +56,6 @@ protected void run() {
             if (resourcePath == null) {
                 // Register and map the dispatcher servlet
                 final ServletHolder holder = new ServletHolder(new CXFNonSpringJaxrsServlet());
-                holder.setInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
                 holder.setInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 holder.setInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 final ServletContextHandler context = new ServletContextHandler();
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb263..8133ebf4ca4 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -7,10 +7,6 @@
 		<servlet-class>org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet</servlet-class>    
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
-		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
 		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
index ded2a64c4b1..799a13478a8 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
@@ -30,7 +30,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
 
@@ -60,8 +59,6 @@ protected void run() {
             if (resourcePath == null) {
                 final Context context = server.addContext("/", base.getAbsolutePath());
                 final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFNonSpringJaxrsServlet());
-                cxfServlet.addInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID,
-                    SseHttpTransportFactory.TRANSPORT_ID);
                 cxfServlet.addInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 cxfServlet.addInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 cxfServlet.setAsyncSupported(true);
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb263..8133ebf4ca4 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -7,10 +7,6 @@
 		<servlet-class>org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet</servlet-class>    
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
-		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
 		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
diff --git a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
index c6f0a8dbf57..c24dfef1afb 100644
--- a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
+++ b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
@@ -24,7 +24,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 import io.undertow.Handlers;
 import io.undertow.Undertow;
@@ -54,7 +53,6 @@ protected void run() {
                 .setDeploymentName("sse-test")
                 .addServlets(
                     servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
-                        .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
                         .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
                         .addInitParam("jaxrs.serviceClasses", BookStore.class.getName())
                         .setAsyncSupported(true)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services