You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2018/05/03 02:53:09 UTC

[GitHub] reta closed pull request #398: [Draft] SSE implementation using AsyncContext

reta closed pull request #398: [Draft] SSE implementation using AsyncContext
URL: https://github.com/apache/cxf/pull/398
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index 1d254f8fef8..cfd3fb5ebf8 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -57,10 +57,6 @@
             <artifactId>${cxf.servlet-api.artifact}</artifactId>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
index a97020f8e8a..28ac1a4a447 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -112,6 +112,8 @@ public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
             writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
             os.write(NEW_LINE);
         }
+
+        os.write(NEW_LINE);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 38842540822..823c47073d9 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.jaxrs.sse;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
@@ -27,6 +28,9 @@
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
 import javax.ws.rs.sse.SseEventSink;
@@ -35,20 +39,56 @@
     private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
 
     private final Set<Consumer<SseEventSink>> closers =
-            new CopyOnWriteArraySet<>();
+        new CopyOnWriteArraySet<>();
 
     private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
-            new CopyOnWriteArraySet<>();
+        new CopyOnWriteArraySet<>();
+
+    private volatile boolean closed;
 
     @Override
     public void register(SseEventSink sink) {
+        if (closed) {
+            throw new IllegalStateException("Already closed");
+        }
+
+        SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+
+        AsyncContext asyncContext = sinkImpl.getAsyncContext();
+
+        asyncContext.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                subscribers.remove(sink);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
+
+            }
+        });
+
         subscribers.add(sink);
     }
 
     @Override
     public CompletionStage<?> broadcast(OutboundSseEvent event) {
+        if (closed) {
+            throw new IllegalStateException("Already closed");
+        }
+
         final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-        
+
         for (SseEventSink sink: subscribers) {
             try {
                 futures.add(sink.send(event).toCompletableFuture());
@@ -62,16 +102,26 @@ public void register(SseEventSink sink) {
 
     @Override
     public void onClose(Consumer<SseEventSink> subscriber) {
+        if (closed) {
+            throw new IllegalStateException("Already closed");
+        }
+
         closers.add(subscriber);
     }
 
     @Override
     public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+        if (closed) {
+            throw new IllegalStateException("Already closed");
+        }
+
         exceptioners.add(exceptioner);
     }
 
     @Override
     public void close() {
+        closed = true;
+
         subscribers.forEach(subscriber -> {
             subscriber.close();
             closers.forEach(closer -> closer.accept(subscriber));
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
similarity index 52%
rename from rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
rename to rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index 8d87a3dbefb..446557a5b70 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -16,22 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.sse.atmosphere;
 
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.servlet.AsyncContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
 
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.jaxrs.ext.ContextProvider;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptor;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
 
-public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+
     @Override
     public SseEventSink createContext(Message message) {
         final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
@@ -39,23 +47,54 @@ public SseEventSink createContext(Message message) {
             throw new IllegalStateException("Unable to retrieve HTTP request from the context");
         }
 
-        final AtmosphereResource resource = (AtmosphereResource)request
-            .getAttribute(AtmosphereResource.class.getName());
-        if (resource == null) {
-            throw new IllegalStateException("AtmosphereResource is not present, "
-                    + "is AtmosphereServlet configured properly?");
+        final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
+            ServerProviderFactory.getInstance(message), message.getExchange());
+
+        AsyncContext ctx = request.startAsync();
+        ctx.setTimeout(0);
+
+        message.getInterceptorChain().add(new SuspendPhaseInterceptor());
+
+        return new SseEventSinkImpl(writer, ctx);
+    }
+
+    private static class SuspendPhaseInterceptor
+        implements PhaseInterceptor<Message> {
+
+        @Override
+        public Set<String> getAfter() {
+            return Collections.emptySet();
         }
 
-        final Broadcaster broadcaster = resource.getAtmosphereConfig()
-            .getBroadcasterFactory()
-            .lookup(resource.uuid(), true);
+        @Override
+        public Set<String> getBefore() {
+            return Collections.singleton(
+                "org.apache.cxf.interceptor.OutgoingChainInterceptor");
+        }
 
-        resource.removeFromAllBroadcasters();
-        resource.setBroadcaster(broadcaster);
+        @Override
+        public String getId() {
+            return "SSE SUSPEND";
+        }
 
-        final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
-            ServerProviderFactory.getInstance(message), message.getExchange());
+        @Override
+        public String getPhase() {
+            return Phase.POST_INVOKE;
+        }
+
+        @Override
+        public Collection<PhaseInterceptor<? extends Message>> getAdditionalInterceptors() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            message.getInterceptorChain().suspend();
+        }
+
+        @Override
+        public void handleFault(Message message) {
+        }
 
-        return new SseAtmosphereEventSinkImpl(writer, resource);
     }
-}
+}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
new file mode 100644
index 00000000000..149723f681d
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
+
+    private final AsyncContext ctx;
+    private final MessageBodyWriter<OutboundSseEvent> writer;
+    private final Queue<QueuedEvent> queuedEvents;
+    private boolean dequeueing;
+    private volatile boolean closed;
+
+    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+                            final AsyncContext ctx) {
+        this.writer = writer;
+        this.queuedEvents = new LinkedList<>();
+        this.ctx = ctx;
+
+        if (ctx == null) {
+            throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
+                + "Is the Servlet configured properly?");
+        }
+
+        ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+    }
+
+    public AsyncContext getAsyncContext() {
+        return ctx;
+    }
+
+    @Override
+    public void close() {
+        if (!closed) {
+            closed = true;
+
+            try {
+                ctx.complete();
+            } catch (final Exception ex) {
+                LOG.warning("Failed to close the AsyncContext cleanly: "
+                    + ex.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public CompletionStage<?> send(OutboundSseEvent event) {
+        final CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (!closed && writer != null) {
+
+            boolean startDequeue;
+            synchronized (this) {
+                queuedEvents.offer(new QueuedEvent(event, future));
+                if (dequeueing) {
+                    startDequeue = false;
+                } else {
+                    startDequeue = true;
+                    dequeueing = true;
+                }
+            }
+
+            if (startDequeue) {
+                ctx.start(this::dequeue);
+            }
+        } else {
+            future.complete(null);
+        }
+
+        return future;
+    }
+
+    private void dequeue() {
+
+        for (;;) {
+            QueuedEvent qe;
+            synchronized (this) {
+                qe = queuedEvents.poll();
+                if (qe == null) {
+                    dequeueing = false;
+                    break;
+                }
+            }
+            OutboundSseEvent event = qe.event;
+            CompletableFuture<?> future = qe.completion;
+
+            try {
+                writer.writeTo(
+                    event, event.getClass(), event.getGenericType(), new Annotation [] {},
+                    event.getMediaType(), null, ctx.getResponse().getOutputStream());
+                ctx.getResponse().flushBuffer();
+                future.complete(null);
+
+            } catch (final Exception ex) {
+                future.completeExceptionally(ex);
+            }
+
+        }
+    }
+
+    private static class QueuedEvent {
+
+        final OutboundSseEvent event;
+
+        final CompletableFuture<?> completion;
+
+        QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+            this.event = event;
+            this.completion = completion;
+        }
+
+    }
+
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index 36ea9ede1f1..9ecbe27c64e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -28,7 +28,6 @@
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
 
 @Provider(value = Type.Feature, scope = Scope.Server)
 public class SseFeature extends AbstractFeature {
@@ -36,8 +35,8 @@
     public void initialize(Server server, Bus bus) {
         final List<Object> providers = new ArrayList<>();
 
-        providers.add(new SseAtmosphereEventSinkContextProvider());
         providers.add(new SseContextProvider());
+        providers.add(new SseEventSinkContextProvider());
 
         ((ServerProviderFactory) server.getEndpoint().get(
             ServerProviderFactory.class.getName())).setUserProviders(providers);
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
deleted file mode 100644
index af984e79daf..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-
-public class SseAtmosphereEventSinkImpl implements SseEventSink {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
-
-    private final AtmosphereResource resource;
-    private final MessageBodyWriter<OutboundSseEvent> writer;
-    private final boolean usingStream;
-    
-    private volatile boolean closed;
-
-    public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
-            final AtmosphereResource resource) {
-        this.writer = writer;
-        this.resource = resource;
-        this.usingStream = (Boolean)resource.getRequest().getAttribute(PROPERTY_USE_STREAM);
-
-        if (!resource.isSuspended()) {
-            LOG.fine("Atmosphere resource is not suspended, suspending");
-            resource.suspend();
-        }
-    }
-
-    @Override
-    public void close() {
-        if (!closed) {
-            closed = true;
-
-            LOG.fine("Closing Atmosphere SSE event output");
-            if (resource.isSuspended()) {
-                LOG.fine("Atmosphere resource is suspended, resuming");
-                resource.resume();
-            }
-
-            final Broadcaster broadcaster = resource.getBroadcaster();
-            resource.removeFromAllBroadcasters();
-
-            try {
-                final AtmosphereResponse response = resource.getResponse();
-                
-                try {
-                    // The resource's request property is null here, we have to
-                    // rely on initial request to understand what to close, response
-                    // or stream.
-                    if (usingStream) {
-                        response.getOutputStream().close();
-                    } else {
-                        response.getWriter().close();
-                    }                    
-                } catch (final IOException ex) {
-                    LOG.warning("Failed to flush AtmosphereResponse buffer: "
-                        + ex.getMessage());
-                }
-            } finally {
-                try {
-                    resource.close();
-                } catch (IOException ex) {
-                    // ignore
-                }
-                broadcaster.destroy();
-                LOG.fine("Atmosphere SSE event output is closed");
-            }
-        }
-    }
-
-    @Override
-    public CompletionStage<?> send(OutboundSseEvent event) {
-        final CompletableFuture<?> future = new CompletableFuture<>();
-        
-        if (!closed && writer != null) {
-            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-                writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-
-                // Atmosphere broadcasts asynchronously which is acceptable in most cases.
-                // Unfortunately, calling close() may lead to response stream being closed
-                // while there are still some SSE delivery scheduled.
-                return CompletableFuture.completedFuture(
-                    resource
-                        .getBroadcaster()
-                        .broadcast(os.toString(StandardCharsets.UTF_8.name()))
-                        .get(1, TimeUnit.SECONDS)
-                    );
-            } catch (final IOException ex) {
-                LOG.warning("While writing the SSE event, an exception was raised: " + ex);
-                future.completeExceptionally(ex);
-            } catch (final ExecutionException | InterruptedException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered");
-                future.completeExceptionally(ex);
-            } catch (final TimeoutException ex) {
-                LOG.warning("SSE Atmosphere response was not delivered within default timeout");
-                future.completeExceptionally(ex);
-            }
-        } else {
-            future.complete(null);
-        }
-        
-        return future;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
deleted file mode 100644
index ab8ef5c5ee8..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.Action;
-import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
-import org.atmosphere.cpr.AsyncIOWriter;
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.interceptor.AllowInterceptor;
-import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
-import org.atmosphere.util.Utils;
-
-import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
-import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
-import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
-import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
-
-/**
- * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
- * implementation does two things which do not fit well into SSE support:
- *  - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
- *  - wraps the whatever object is being written to SSE payload (overridden using
- *    the complete SSE protocol)
- */
-public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereInterceptor.class);
-
-    private static final byte[] PADDING;
-    private static final String PADDING_TEXT;
-    private static final byte[] END = "\r\n\r\n".getBytes();
-
-    static {
-        StringBuffer whitespace = new StringBuffer();
-        for (int i = 0; i < 2000; i++) {
-            whitespace.append(" ");
-        }
-        whitespace.append("\n");
-        PADDING_TEXT = whitespace.toString();
-        PADDING = PADDING_TEXT.getBytes();
-    }
-
-    private boolean writePadding(AtmosphereResponse response) {
-        if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
-            return false;
-        }
-
-        response.setContentType(SERVER_SENT_EVENTS);
-        response.setCharacterEncoding("utf-8");
-        boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
-        if (isUsingStream) {
-            try {
-                OutputStream stream = response.getResponse().getOutputStream();
-                try {
-                    stream.write(PADDING);
-                    stream.flush();
-                } catch (IOException ex) {
-                    LOG.log(Level.WARNING, "SSE may not work", ex);
-                }
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        } else {
-            try {
-                PrintWriter w = response.getResponse().getWriter();
-                w.println(PADDING_TEXT);
-                w.flush();
-            } catch (IOException e) {
-                LOG.log(Level.FINEST, "", e);
-            }
-        }
-        response.resource().getRequest().setAttribute("paddingWritten", "true");
-        return true;
-    }
-
-    @Override
-    public Action inspect(final AtmosphereResource r) {
-        if (Utils.webSocketMessage(r)) {
-            return Action.CONTINUE;
-        }
-
-        final AtmosphereRequest request = r.getRequest();
-        final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
-
-        if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
-            final AtmosphereResponse response = r.getResponse();
-            if (response.getAsyncIOWriter() == null) {
-                response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
-            }
-
-            r.addEventListener(new P(response));
-
-            AsyncIOWriter writer = response.getAsyncIOWriter();
-            if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
-                AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
-                    private boolean padding() {
-                        if (!r.isSuspended()) {
-                            return writePadding(response);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        padding();
-                    }
-
-                    @Override
-                    public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
-                        // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
-                        // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
-                        if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
-                                || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
-                            response.write(END, true);
-                        }
-
-                        /**
-                         * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
-                         * resume after every message.
-                         */
-                        String ua = r.getRequest().getHeader("User-Agent");
-                        if (ua != null && ua.contains("MSIE")) {
-                            try {
-                                response.flushBuffer();
-                            } catch (IOException e) {
-                                LOG.log(Level.FINEST, "", e);
-                            }
-                            r.resume();
-                        }
-                    }
-                });
-            } else {
-                LOG.warning(String.format("Unable to apply %s. Your AsyncIOWriter must implement %s",
-                    getClass().getName(), AtmosphereInterceptorWriter.class.getName()));
-            }
-        }
-
-        return Action.CONTINUE;
-    }
-
-    private final class P extends OnPreSuspend implements AllowInterceptor {
-
-        private final AtmosphereResponse response;
-
-        private P(AtmosphereResponse response) {
-            this.response = response;
-        }
-
-        @Override
-        public void onPreSuspend(AtmosphereResourceEvent event) {
-            writePadding(response);
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index b0123d03ca3..c6267d49aae 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -18,24 +18,15 @@
  */
 package org.apache.cxf.jaxrs.sse.ext;
 
-import org.apache.cxf.Bus;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
 import org.apache.cxf.jaxrs.sse.SseContextProvider;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
-import org.apache.cxf.transport.AbstractTransportFactory;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
 
 public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
     @Override
     public void customize(final JAXRSServerFactoryBean bean) {
-        bean.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
         bean.setProvider(new SseContextProvider());
-        bean.setProvider(new SseAtmosphereEventSinkContextProvider());
-        
-        final Bus bus = bean.getBus();
-        if (bus != null) {
-            bus.setProperty(AbstractTransportFactory.PREFERRED_TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
-        }
+        bean.setProvider(new SseEventSinkContextProvider());
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
deleted file mode 100644
index b1ca29d32d8..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseDestinationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-import org.apache.cxf.transport.http.HttpDestinationFactory;
-import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
-
-public class SseDestinationFactory implements HttpDestinationFactory {
-    private static final Logger LOG = LogUtils.getL7dLogger(SseDestinationFactory.class);
-    
-    @Override
-    public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus, 
-            DestinationRegistry registry) throws IOException {
-        return new AtmosphereSseServletDestination(bus, getDestinationRegistry(bus), endpointInfo, 
-            endpointInfo.getAddress());
-    } 
-    
-    /**
-     * The destination factory should be taken from HTTP transport as a workaround to 
-     * register the destinations properly in the OSGi container.
-     */
-    private static DestinationRegistry getDestinationRegistry(Bus bus) {
-        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
-        
-        try {
-            DestinationFactory df = dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
-            if (df instanceof HTTPTransportFactory) {
-                HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
-                return transportFactory.getRegistry();
-            }
-        } catch (final Exception ex) {
-            LOG.warning("Unable to deduct the destination registry from HTTP transport: " + ex.getMessage());
-        }
-        
-        return null;
-    }
-
-
-}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
deleted file mode 100644
index a1f0d68ab0c..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.sse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.HTTPTransportFactory;
-
-@NoJSR250Annotations
-public class SseHttpTransportFactory extends HTTPTransportFactory
-        implements ConduitInitiator, DestinationFactory {
-
-    public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
-    public static final List<String> DEFAULT_NAMESPACES = Collections.unmodifiableList(Arrays.asList(
-        TRANSPORT_ID,
-        "http://cxf.apache.org/transports/http/sse/configuration"
-    ));
-    
-    private final SseDestinationFactory factory = new SseDestinationFactory();
-
-    public SseHttpTransportFactory() {
-        this(null);
-    }
-
-    public SseHttpTransportFactory(DestinationRegistry registry) {
-        super(DEFAULT_NAMESPACES, registry);
-    }
-
-    @Override
-    public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
-        if (endpointInfo == null) {
-            throw new IllegalArgumentException("EndpointInfo cannot be null");
-        }
-        
-        // In order to register the destination in the OSGi container, we have to 
-        // include it into 2 registries basically: for HTTP transport and the current 
-        // one. The workaround is borrow from org.apache.cxf.transport.websocket.WebSocketTransportFactory,
-        // it seems like no better option exists at the moment.
-        synchronized (registry) {
-            AbstractHTTPDestination d = registry.getDestinationForPath(endpointInfo.getAddress());
-            
-            if (d == null) {
-                d = factory.createDestination(endpointInfo, bus, registry);
-                if (d == null) {
-                    throw new IOException("No destination available. The CXF SSE transport needs Atmosphere"
-                        + " dependencies to be available");
-                }
-                registry.addDestination(d);
-                configure(bus, d);
-                d.finalizeConfig();
-            }
-            
-            return d;
-        }
-    }
-}
\ No newline at end of file
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
similarity index 60%
rename from rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
rename to rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
index cfafe870c3c..3a9c1b7423e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.sse.atmosphere;
+package org.apache.cxf.transport.sse;
 
-import java.io.IOException;
+import java.util.Arrays;
 
-import org.atmosphere.cpr.AtmosphereInterceptorWriter;
-import org.atmosphere.cpr.AtmosphereResponse;
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusCreationListener;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+
+public class SseProvidersExtension implements BusCreationListener {
 
-public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
     @Override
-    public void close(AtmosphereResponse response) throws IOException {
-        // Do not close the response, keep output stream open
+    public void busCreated(Bus bus) {
+        bus.setProperty(
+            "org.apache.cxf.jaxrs.bus.providers",
+            Arrays.asList(new SseContextProvider(), new SseEventSinkContextProvider()));
     }
+    
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
deleted file mode 100644
index add8afa08de..00000000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.sse.atmosphere;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.jaxrs.sse.SseFeature;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.DestinationRegistry;
-import org.apache.cxf.transport.http.Headers;
-import org.apache.cxf.transport.servlet.ServletDestination;
-import org.atmosphere.cache.UUIDBroadcasterCache;
-import org.atmosphere.cpr.ApplicationConfig;
-import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponseImpl;
-import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-
-public class AtmosphereSseServletDestination extends ServletDestination {
-    private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
-
-    private AtmosphereFramework framework;
-
-    public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry,
-            EndpointInfo ei, String path) throws IOException {
-        super(bus, registry, ei, path);
-
-        framework = create();
-
-        bus.getFeatures().add(new SseFeature());
-    }
-
-    private AtmosphereFramework create() {
-        final AtmosphereFramework instance = new AtmosphereFramework(true, false);
-        
-        instance.interceptor(new SseAtmosphereInterceptor());
-        instance.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
-        instance.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
-        instance.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
-        // Atmosphere does not limit amount of threads and application can crash in no time
-        // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere-for-Performance
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "20");
-        instance.addInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "20");
-        // workaround for atmosphere's jsr356 initialization requiring servletConfig
-        instance.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true");
-
-        instance.setBroadcasterCacheClassName(UUIDBroadcasterCache.class.getName());
-        instance.addAtmosphereHandler("/", new DestinationHandler());
-        
-        return instance;
-    }
-    
-    @Override
-    public void onServletConfigAvailable(ServletConfig config) throws ServletException {
-        // Very likely there is JSR-356 implementation available, let us reconfigure the Atmosphere framework
-        // to use it since ServletConfig instance is already available.
-        final Object container = config.getServletContext()
-            .getAttribute("javax.websocket.server.ServerContainer");
-
-        if (container != null) {
-            if (framework.initialized()) {
-                framework.destroy();
-            }
-            
-            framework = create();
-            framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "false");
-            framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "false");
-            
-            framework.init(config);
-        }
-    }
-    
-    @Override
-    public void finalizeConfig() {
-        if (framework.initialized()) {
-            return;
-        }
-        
-        final ServletContext ctx = bus.getExtension(ServletContext.class);
-        if (ctx != null) {
-            try {
-                framework.init(new ServletConfig() {
-                    @Override
-                    public String getServletName() {
-                        return null;
-                    }
-                    @Override
-                    public ServletContext getServletContext() {
-                        return ctx;
-                    }
-                    @Override
-                    public String getInitParameter(String name) {
-                        return null;
-                    }
-
-                    @Override
-                    public Enumeration<String> getInitParameterNames() {
-                        return null;
-                    }
-                });
-            } catch (ServletException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        } else {
-            framework.init();
-        }
-    }
-
-    @Override
-    public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
-            HttpServletResponse resp) throws IOException {
-        try {
-            framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
-        } catch (ServletException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        try {
-            framework.destroy();
-        } catch (Exception ex) {
-            LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
-        } finally {
-            super.shutdown();
-        }
-    }
-
-    private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
-        @Override
-        public void onRequest(final AtmosphereResource resource) throws IOException {
-            LOG.fine("onRequest");
-            try {
-                AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
-                    resource.getRequest(), resource.getResponse());
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Failed to invoke service", e);
-            }
-        }
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage, getStream);
-    }
-
-    @Override
-    protected OutputStream flushHeaders(Message outMessage) throws IOException {
-        adjustContentLength(outMessage);
-        return super.flushHeaders(outMessage);
-    }
-
-    /**
-     * It has been noticed that Jetty checks the "Content-Length" header and completes the
-     * response if its value is 0 (or matches the number of bytes written). However, in case
-     * of SSE the content length is unknown so we are setting it to -1 before flushing the
-     * response. Otherwise, only the first event is going to be sent and response is going to
-     * be closed.
-     */
-    private void adjustContentLength(Message outMessage) {
-        final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
-
-        if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
-            final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
-            headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
-        }
-    }
-}
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
index 643b51c04e9..97a6b9f38b5 100644
--- a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -1 +1 @@
-org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file
+org.apache.cxf.transport.sse.SseProvidersExtension::true
\ No newline at end of file
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index b2dffe3b57d..fe7572d43b4 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -536,18 +536,15 @@
            <artifactId>snakeyaml</artifactId>
            <scope>test</scope>
         </dependency>
-        <dependency>
-            <!-- activate atmosphere. Its use can be disabled by setting
-                 org.apache.cxf.transport.websocket.atmosphere.disabled to "true" -->
-            <groupId>org.atmosphere</groupId>
-            <artifactId>atmosphere-runtime</artifactId>
-            <version>${cxf.atmosphere.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.qi4j.library</groupId>
             <artifactId>org.qi4j.library.circuitbreaker</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/systests/rs-sse/rs-sse-base/pom.xml b/systests/rs-sse/rs-sse-base/pom.xml
index 04584c24ca8..260f609b23f 100644
--- a/systests/rs-sse/rs-sse-base/pom.xml
+++ b/systests/rs-sse/rs-sse-base/pom.xml
@@ -49,5 +49,9 @@
             <artifactId>cxf-testutils</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index a0c3fd81028..8f22f40992f 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -124,6 +124,7 @@ public void stop() {
             broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000));
             broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000));
 
+            Thread.sleep(2000);
         } catch (final InterruptedException ex) {
             LOG.error("Wait has been interrupted", ex);
         }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index f97906caadd..9bafda4cbe7 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -123,6 +123,8 @@ public void stop() {
             broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000));
             broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000));
 
+            Thread.sleep(2000);
+
         } catch (final InterruptedException ex) {
             LOG.error("Wait has been interrupted", ex);
         }
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
index a47c0c3018b..89deec5a26b 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
+++ b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
@@ -24,7 +24,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -57,7 +56,6 @@ protected void run() {
             if (resourcePath == null) {
                 // Register and map the dispatcher servlet
                 final ServletHolder holder = new ServletHolder(new CXFNonSpringJaxrsServlet());
-                holder.setInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID);
                 holder.setInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 holder.setInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 final ServletContextHandler context = new ServletContextHandler();
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb263..8133ebf4ca4 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-jetty/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -7,10 +7,6 @@
 		<servlet-class>org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet</servlet-class>    
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
-		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
 		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
index ded2a64c4b1..799a13478a8 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
@@ -30,7 +30,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
 
@@ -60,8 +59,6 @@ protected void run() {
             if (resourcePath == null) {
                 final Context context = server.addContext("/", base.getAbsolutePath());
                 final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFNonSpringJaxrsServlet());
-                cxfServlet.addInitParameter(CXFNonSpringJaxrsServlet.TRANSPORT_ID,
-                    SseHttpTransportFactory.TRANSPORT_ID);
                 cxfServlet.addInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
                 cxfServlet.addInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
                 cxfServlet.setAsyncSupported(true);
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
index 64746acb263..8133ebf4ca4 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/resources/jaxrs_sse/WEB-INF/web.xml
@@ -7,10 +7,6 @@
 		<servlet-class>org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet</servlet-class>    
 		<load-on-startup>1</load-on-startup>
 		<async-supported>true</async-supported>
-		<init-param>
-			<param-name>transportId</param-name>
-			<param-value>http://cxf.apache.org/transports/http/sse</param-value>
-		</init-param>
 		<init-param>
 			<param-name>javax.ws.rs.Application</param-name>
 			<param-value>org.apache.cxf.systest.jaxrs.sse.SseApplication</param-value>
diff --git a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
index c6f0a8dbf57..c24dfef1afb 100644
--- a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
+++ b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
@@ -24,7 +24,6 @@
 import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
 import org.apache.cxf.systest.jaxrs.sse.BookStore;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.transport.sse.SseHttpTransportFactory;
 
 import io.undertow.Handlers;
 import io.undertow.Undertow;
@@ -54,7 +53,6 @@ protected void run() {
                 .setDeploymentName("sse-test")
                 .addServlets(
                     servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
-                        .addInitParam(CXFNonSpringJaxrsServlet.TRANSPORT_ID, SseHttpTransportFactory.TRANSPORT_ID)
                         .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
                         .addInitParam("jaxrs.serviceClasses", BookStore.class.getName())
                         .setAsyncSupported(true)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services