You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/10/17 02:52:21 UTC

[cxf] branch 3.2.x-fixes updated: CXF-7874: JAX-RS SSE Leaking SSESink (s)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.2.x-fixes by this push:
     new 829fbc6  CXF-7874: JAX-RS SSE Leaking SSESink (s)
829fbc6 is described below

commit 829fbc63033eb64bc3aa5847b3717fff75ba05ea
Author: reta <dr...@gmail.com>
AuthorDate: Tue Oct 16 22:23:18 2018 -0400

    CXF-7874: JAX-RS SSE Leaking SSESink (s)
---
 .../org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java | 132 +++++++++++++++++--
 .../jaxrs/sse/client/InboundSseEventProcessor.java |   4 +
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java     |  66 ++++++++++
 .../systest/jaxrs/sse/BookBroadcasterStats.java    |  77 +++++++++++
 .../apache/cxf/systest/jaxrs/sse/BookStore.java    |  14 +-
 .../apache/cxf/systest/jaxrs/sse/BookStore2.java   |  14 +-
 .../jaxrs/sse/BookStoreClientCloseable.java        | 146 +++++++++++++++++++++
 .../jaxrs/sse/tomcat/TomcatEmbeddedTest.java       |   4 +
 .../systest/jaxrs/sse/tomcat/TomcatWarTest.java    |   4 +
 9 files changed, 429 insertions(+), 32 deletions(-)

diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
index f39de07..1d5ea6d 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.jaxrs.sse;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -26,10 +27,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 import java.util.logging.Logger;
 
 import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
@@ -47,6 +51,8 @@ public class SseEventSinkImpl implements SseEventSink {
     private final Queue<QueuedEvent> buffer;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicBoolean dispatching = new AtomicBoolean(false);
+    private final AtomicReference<Throwable> throwable = new AtomicReference<>();
+    private final AtomicBoolean completed = new AtomicBoolean(false);
 
     public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
             final AsyncResponse async, final AsyncContext ctx) {
@@ -61,15 +67,43 @@ public class SseEventSinkImpl implements SseEventSink {
         }
 
         ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+        ctx.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent event) throws IOException {
+                // This callback should be called when dequeue() has encountered an
+                // error during the execution and is forced to complete the context.
+                close();
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent event) throws IOException {
+            }
+
+            @Override
+            public void onError(AsyncEvent event) throws IOException {
+                // In case of Tomcat, the context is closed automatically when client closes
+                // the connection.
+                if (throwable.get() != null || throwable.compareAndSet(null, event.getThrowable())) {
+                    // This callback should be called when dequeue() has encountered an
+                    close();
+                }
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent event) throws IOException {
+            }
+        });
     }
 
     public AsyncContext getAsyncContext() {
         return ctx;
     }
-
+    
     @Override
     public void close() {
         if (closed.compareAndSet(false, true)) {
+            LOG.fine("Closing SSE sink now");
+            
             // In case we are still dispatching, give the events the chance to be
             // sent over to the consumers. The good example would be sent(event) call,
             // immediately followed by the close() call.
@@ -77,10 +111,30 @@ public class SseEventSinkImpl implements SseEventSink {
                 LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
             }
             
-            try {
-                ctx.complete();
-            } catch (final IllegalStateException ex) {
-                LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+            if (completed.compareAndSet(false, true)) {
+                try {
+                    // In case of Tomcat, the context may be already closed (f.e. due to error),
+                    // in this case request is set to null.
+                    if (ctx.getRequest() != null) {
+                        LOG.fine("Completing the AsyncContext");
+                        ctx.complete();
+                    }
+                } catch (final IllegalStateException ex) {
+                    LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+                }
+            }
+            
+            // Complete all the accepted but not dispatched send request with the
+            // error (if any) or signal that sink has been closed already.
+            Throwable ex = throwable.get();
+            if (ex == null) {
+                ex = new IllegalStateException("The sink has been already closed");
+            }
+            
+            QueuedEvent queuedEvent = buffer.poll();
+            while (queuedEvent != null) {
+                queuedEvent.completion.completeExceptionally(ex);
+                queuedEvent = buffer.poll();
             }
         }
     }
@@ -106,7 +160,10 @@ public class SseEventSinkImpl implements SseEventSink {
         final CompletableFuture<?> future = new CompletableFuture<>();
 
         if (!closed.get() && writer != null) {
-            if (buffer.offer(new QueuedEvent(event, future))) {
+            final Throwable ex = throwable.get(); 
+            if (ex != null) {
+                future.completeExceptionally(ex);
+            } else if (buffer.offer(new QueuedEvent(event, future))) {
                 if (dispatching.compareAndSet(false, true)) {
                     ctx.start(this::dequeue);
                 }
@@ -122,30 +179,77 @@ public class SseEventSinkImpl implements SseEventSink {
         return future;
     }
 
+    /**
+     * Processes the buffered events and sends the off to the output channel. There  is
+     * a special handling for the IOException, which forces the sink to switch to closed 
+     * state: 
+     *   - when the IOException is detected, the AsyncContext is forcebly closed (unless
+     *     it is already closed like in case of the Tomcat)
+     *   - all unsent events are completed exceptionally
+     *   - all unscheduled events are completed exceptionally (see please close() method)
+     *   
+     */
     private void dequeue() {
+        Throwable error = throwable.get();
+        
         try {
             while (true) {
-                final QueuedEvent qeuedEvent = buffer.poll();
+                final QueuedEvent queuedEvent = buffer.poll();
                 
                 // Nothing queued, release the thread
-                if (qeuedEvent == null) {
+                if (queuedEvent == null) {
                     break;
                 }
                 
-                final OutboundSseEvent event = qeuedEvent.event;
-                final CompletableFuture<?> future = qeuedEvent.completion;
+                final OutboundSseEvent event = queuedEvent.event;
+                final CompletableFuture<?> future = queuedEvent.completion;
     
                 try {
-                    writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
-                        event.getMediaType(), null, ctx.getResponse().getOutputStream());
-                    ctx.getResponse().flushBuffer();
-                    future.complete(null);
+                    if (error == null) {
+                        LOG.fine("Dispatching SSE event over the wire");
+                        
+                        writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+                            event.getMediaType(), null, ctx.getResponse().getOutputStream());
+                        ctx.getResponse().flushBuffer();
+                        
+                        LOG.fine("Completing the future successfully");
+                        future.complete(null);
+                    } else {
+                        LOG.fine("Completing the future unsuccessfully (error enountered previously)");
+                        future.completeExceptionally(error);
+                    }
                 } catch (final Exception ex) {
+                    // Very likely the connection is closed by the client (but we cannot
+                    // detect if for sure, container-specific).
+                    if (ex instanceof IOException) {
+                        error = (IOException)ex;
+                    }
+                    
+                    LOG.fine("Completing the future unsuccessfully (error enountered)");
                     future.completeExceptionally(ex);
                 }
             }
         } finally {
+            final boolean shouldComplete = (error != null) && throwable.compareAndSet(null, error);
             dispatching.set(false);
+            
+            // Ideally, we should be rethrowing the exception here (error) and handle
+            // it inside the onError() callback. However, most of the servlet containers
+            // do not handle this case properly (and onError() is not called). 
+            if (shouldComplete && completed.compareAndSet(false, true)) {
+                LOG.warning("Prematurely completing the AsyncContext due to error encountered: " + error);
+                // In case of Tomcat, the context is closed automatically when client closes
+                // the connection and onError callback will be called (in this case request 
+                // is set to null).
+                if (ctx.getRequest() != null) {
+                    LOG.fine("Completing the AsyncContext");
+                    try {
+                        ctx.complete();
+                    } catch (final IllegalStateException ex) {
+                        LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+                    }
+                }
+            }
         }
     }
 
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 8e02e71..c5a2b0e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -28,11 +28,13 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
 
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.sse.InboundSseEvent;
 
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.jaxrs.client.ClientProviderFactory;
@@ -44,6 +46,7 @@ public class InboundSseEventProcessor {
     public static final String SERVER_SENT_EVENTS = "text/event-stream";
     public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
 
+    private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
     private static final String COMMENT = ": ";
     private static final String EVENT = "event: ";
     private static final String ID = "id: ";
@@ -116,6 +119,7 @@ public class InboundSseEventProcessor {
             }
 
             if (response != null) {
+                LOG.fine("Closing the response");
                 response.close();
             }
 
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 1e32b4b..5d6b8f4 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItems;
 
 public abstract class AbstractSseTest extends AbstractSseBaseTest {
@@ -200,6 +201,71 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
         );
     }
 
+    @Test
+    public void testClientClosesEventSource() throws InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/client-closes-connection/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+
+        try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            
+            // wait for single event, close before server sends other 3
+            awaitEvents(200, books, 1);
+            
+            // Only two out of 4 messages should be delivered, others should be discarded
+            final Response r = 
+                createWebClient("/rest/api/bookstore/client-closes-connection/received", MediaType.APPLICATION_JSON)
+                    .put(null);
+            assertThat(r.getStatus(), equalTo(204));
+            
+            assertThat(eventSource.close(1, TimeUnit.SECONDS), equalTo(true));
+        }
+
+        // Easing the test verification here, it does not work well for Atm + Jetty
+        assertThat(books,
+            hasItems(
+                new Book("New Book #1", 1)
+            )
+        );
+        
+        // Only two out of 4 messages should be delivered, others should be discarded
+        final Response r = 
+            createWebClient("/rest/api/bookstore/client-closes-connection/closed", MediaType.APPLICATION_JSON)
+                .put(null);
+        assertThat(r.getStatus(), equalTo(204));
+
+        // Give server some time to finish up the sink
+        Thread.sleep(2000);
+        
+        // Only two out of 4 messages should be delivered, others should be discarded
+        final BookBroadcasterStats stats = 
+            createWebClient("/rest/api/bookstore/client-closes-connection/stats", MediaType.APPLICATION_JSON)
+                .get()
+                .readEntity(BookBroadcasterStats.class);
+        
+        // Tomcat will feedback through onError callback, others through onComplete
+        assertThat(stats.isErrored(), equalTo(supportsErrorPropagation()));
+        // The sink should be in closed state
+        assertThat(stats.isWasClosed(), equalTo(true));
+        // The onClose callback should be called
+        assertThat(stats.isClosed(), equalTo(true));
+
+        // It is very hard to get the predictable match here, but at most
+        // 2 events could get through before the client's connection drop off
+        assertTrue(stats.getCompleted() == 2 || stats.getCompleted() == 1);
+    }
+    
+    /**
+     * Jetty / Undertow do not propagate errors from the runnable passed to 
+     * AsyncContext::start() up to the AsyncEventListener::onError(). Tomcat however 
+     * does it.
+     * @return
+     */
+    protected boolean supportsErrorPropagation() {
+        return false;
+    }
+
     private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
         return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
     }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java
new file mode 100644
index 0000000..d82a8a5
--- /dev/null
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jaxrs.sse;
+
+public class BookBroadcasterStats {
+    private int completed;
+    private boolean closed;
+    private boolean errored;
+    private boolean wasClosed;
+    
+    public synchronized void inc() {
+        setCompleted(getCompleted() + 1);
+    }
+
+    public synchronized void reset() {
+        setCompleted(0);
+        setClosed(false);
+        setErrored(false);
+        setWasClosed(false);
+    }
+    
+    public synchronized void closed() {
+        setClosed(true);
+    }
+    
+    public synchronized void errored() {
+        setErrored(true);
+    }
+
+    public int getCompleted() {
+        return completed;
+    }
+
+    public void setCompleted(int completed) {
+        this.completed = completed;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void setClosed(boolean closed) {
+        this.closed = closed;
+    }
+
+    public boolean isErrored() {
+        return errored;
+    }
+
+    public void setErrored(boolean errored) {
+        this.errored = errored;
+    }
+
+    public boolean isWasClosed() {
+        return wasClosed;
+    }
+
+    public void setWasClosed(boolean wasClosed) {
+        this.wasClosed = wasClosed;
+    }
+}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index ad2387f..a32f731 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -34,7 +34,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.OutboundSseEvent.Builder;
 import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseBroadcaster;
@@ -44,7 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Path("/api/bookstore")
-public class BookStore {
+public class BookStore extends BookStoreClientCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
 
     private final CountDownLatch latch = new CountDownLatch(2);
@@ -150,12 +149,9 @@ public class BookStore {
             LOG.error("Wait has been interrupted", ex);
         }
     }
-
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id(Integer.toString(eventId))
-            .data(Book.class, new Book("New Book #" + eventId, eventId))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
+    
+    @Override
+    protected Sse getSse() {
+        return sse;
     }
 }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index f78c5ce..d4f0158 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -34,7 +34,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.OutboundSseEvent.Builder;
 import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseBroadcaster;
@@ -44,7 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Path("/api/bookstore")
-public class BookStore2 {
+public class BookStore2 extends BookStoreClientCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(BookStore2.class);
 
     private final CountDownLatch latch = new CountDownLatch(2);
@@ -149,12 +148,9 @@ public class BookStore2 {
             LOG.error("Wait has been interrupted", ex);
         }
     }
-
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
-        return builder
-            .id(Integer.toString(eventId))
-            .data(Book.class, new Book("New Book #" + eventId, eventId))
-            .mediaType(MediaType.APPLICATION_JSON_TYPE)
-            .build();
+    
+    @Override
+    protected Sse getSse() {
+        return sse;
     }
 }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
new file mode 100644
index 0000000..b9b599d
--- /dev/null
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jaxrs.sse;
+
+import java.util.concurrent.Phaser;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BookStoreClientCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
+    
+    private final BookBroadcasterStats stats = new BookBroadcasterStats(); 
+    private final Phaser phaser = new Phaser(2);
+    
+    protected abstract Sse getSse();
+
+    @GET
+    @Path("client-closes-connection/sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void clientCloseConnection(@Context SseEventSink sink, @PathParam("id") final String idIgnore,
+        @HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("0") final String lastEventId) {
+
+        stats.reset();
+        new Thread(() -> {
+            try {
+                final Integer id = Integer.valueOf(lastEventId);
+                final Builder builder = getSse().newEventBuilder();
+
+                SseBroadcaster localBroadcaster = getSse().newBroadcaster();
+                localBroadcaster.onError((sseEventSink, throwable) -> stats.errored());
+                localBroadcaster.onClose(sseEventSink -> stats.closed());
+                localBroadcaster.register(sink);
+
+                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1))
+                    .whenComplete((r, ex) -> stats.inc());
+                
+                // Await client to confirm the it got the event (PUT /client-closes-connection/received)
+                phaser.arriveAndAwaitAdvance();
+                
+                Thread.sleep(500);
+                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2))
+                    .whenComplete((r, ex) -> { 
+                        // we expect exception here
+                        if (ex == null && !sink.isClosed()) {
+                            stats.inc();
+                        }   
+                    });
+
+                // Await client to confirm the connection is closed (PUT /client-closes-connection/closed)
+                phaser.arriveAndAwaitAdvance();
+                
+                // This event should complete exceptionally since SseEventSource should be 
+                // closed already.
+                Thread.sleep(500);
+                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3))
+                    .whenComplete((r, ex) -> { 
+                        // we expect exception here
+                        if (ex == null && !sink.isClosed()) {
+                            stats.inc();
+                        }   
+                    });
+                
+                // This event should complete immediately since the sink has been removed
+                // from the broadcaster (closed).
+                Thread.sleep(500);
+                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4))
+                    .whenComplete((r, ex) -> {
+                        // we expect the sink to be closed at this point
+                        if (ex != null || !sink.isClosed()) {
+                            stats.inc();
+                        }   
+                    });
+
+                stats.setWasClosed(sink.isClosed());
+                phaser.arriveAndDeregister();
+                
+                sink.close();
+            } catch (final InterruptedException ex) {
+                LOG.error("Communication error", ex);
+            }
+        }
+        ).start();
+    }
+    
+    @PUT
+    @Path("client-closes-connection/received")
+    @Produces(MediaType.APPLICATION_JSON)
+    public void received() {
+        phaser.arriveAndAwaitAdvance();
+    }
+    
+    @PUT
+    @Path("client-closes-connection/closed")
+    @Produces(MediaType.APPLICATION_JSON)
+    public void closed() {
+        phaser.arriveAndDeregister();
+    }
+    
+    @GET
+    @Path("client-closes-connection/stats")
+    @Produces(MediaType.APPLICATION_JSON)
+    public BookBroadcasterStats stats() {
+        return stats;
+    }
+    
+    protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id(Integer.toString(eventId))
+            .data(Book.class, new Book("New Book #" + eventId, eventId))
+            .mediaType(MediaType.APPLICATION_JSON_TYPE)
+            .build();
+    }
+}
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
index 9621b6d..e655391 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
@@ -47,4 +47,8 @@ public class TomcatEmbeddedTest extends AbstractSseTest {
         return EmbeddedTomcatServer.PORT;
     }
 
+    @Override
+    protected boolean supportsErrorPropagation() {
+        return true;
+    }
 }
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
index c58fe4e..58f6194 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
@@ -47,4 +47,8 @@ public class TomcatWarTest extends AbstractSseTest {
         return EmbeddedTomcatServer.PORT;
     }
 
+    @Override
+    protected boolean supportsErrorPropagation() {
+        return true;
+    }
 }