You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2018/10/17 02:52:21 UTC
[cxf] branch 3.2.x-fixes updated: CXF-7874: JAX-RS SSE Leaking
SSESink (s)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.2.x-fixes by this push:
new 829fbc6 CXF-7874: JAX-RS SSE Leaking SSESink (s)
829fbc6 is described below
commit 829fbc63033eb64bc3aa5847b3717fff75ba05ea
Author: reta <dr...@gmail.com>
AuthorDate: Tue Oct 16 22:23:18 2018 -0400
CXF-7874: JAX-RS SSE Leaking SSESink (s)
---
.../org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java | 132 +++++++++++++++++--
.../jaxrs/sse/client/InboundSseEventProcessor.java | 4 +
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 66 ++++++++++
.../systest/jaxrs/sse/BookBroadcasterStats.java | 77 +++++++++++
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 14 +-
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 14 +-
.../jaxrs/sse/BookStoreClientCloseable.java | 146 +++++++++++++++++++++
.../jaxrs/sse/tomcat/TomcatEmbeddedTest.java | 4 +
.../systest/jaxrs/sse/tomcat/TomcatWarTest.java | 4 +
9 files changed, 429 insertions(+), 32 deletions(-)
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
index f39de07..1d5ea6d 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkImpl.java
@@ -19,6 +19,7 @@
package org.apache.cxf.jaxrs.sse;
+import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
@@ -26,10 +27,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
@@ -47,6 +51,8 @@ public class SseEventSinkImpl implements SseEventSink {
private final Queue<QueuedEvent> buffer;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean dispatching = new AtomicBoolean(false);
+ private final AtomicReference<Throwable> throwable = new AtomicReference<>();
+ private final AtomicBoolean completed = new AtomicBoolean(false);
public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
final AsyncResponse async, final AsyncContext ctx) {
@@ -61,15 +67,43 @@ public class SseEventSinkImpl implements SseEventSink {
}
ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+ ctx.addListener(new AsyncListener() {
+ @Override
+ public void onComplete(AsyncEvent event) throws IOException {
+ // This callback should be called when dequeue() has encountered an
+ // error during the execution and is forced to complete the context.
+ close();
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent event) throws IOException {
+ }
+
+ @Override
+ public void onError(AsyncEvent event) throws IOException {
+ // In case of Tomcat, the context is closed automatically when client closes
+ // the connection.
+ if (throwable.get() != null || throwable.compareAndSet(null, event.getThrowable())) {
+ // This callback should be called when dequeue() has encountered an
+ close();
+ }
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event) throws IOException {
+ }
+ });
}
public AsyncContext getAsyncContext() {
return ctx;
}
-
+
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
+ LOG.fine("Closing SSE sink now");
+
// In case we are still dispatching, give the events the chance to be
// sent over to the consumers. The good example would be sent(event) call,
// immediately followed by the close() call.
@@ -77,10 +111,30 @@ public class SseEventSinkImpl implements SseEventSink {
LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
}
- try {
- ctx.complete();
- } catch (final IllegalStateException ex) {
- LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+ if (completed.compareAndSet(false, true)) {
+ try {
+ // In case of Tomcat, the context may be already closed (f.e. due to error),
+ // in this case request is set to null.
+ if (ctx.getRequest() != null) {
+ LOG.fine("Completing the AsyncContext");
+ ctx.complete();
+ }
+ } catch (final IllegalStateException ex) {
+ LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+ }
+ }
+
+ // Complete all the accepted but not dispatched send request with the
+ // error (if any) or signal that sink has been closed already.
+ Throwable ex = throwable.get();
+ if (ex == null) {
+ ex = new IllegalStateException("The sink has been already closed");
+ }
+
+ QueuedEvent queuedEvent = buffer.poll();
+ while (queuedEvent != null) {
+ queuedEvent.completion.completeExceptionally(ex);
+ queuedEvent = buffer.poll();
}
}
}
@@ -106,7 +160,10 @@ public class SseEventSinkImpl implements SseEventSink {
final CompletableFuture<?> future = new CompletableFuture<>();
if (!closed.get() && writer != null) {
- if (buffer.offer(new QueuedEvent(event, future))) {
+ final Throwable ex = throwable.get();
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ } else if (buffer.offer(new QueuedEvent(event, future))) {
if (dispatching.compareAndSet(false, true)) {
ctx.start(this::dequeue);
}
@@ -122,30 +179,77 @@ public class SseEventSinkImpl implements SseEventSink {
return future;
}
+ /**
+ * Processes the buffered events and sends the off to the output channel. There is
+ * a special handling for the IOException, which forces the sink to switch to closed
+ * state:
+ * - when the IOException is detected, the AsyncContext is forcebly closed (unless
+ * it is already closed like in case of the Tomcat)
+ * - all unsent events are completed exceptionally
+ * - all unscheduled events are completed exceptionally (see please close() method)
+ *
+ */
private void dequeue() {
+ Throwable error = throwable.get();
+
try {
while (true) {
- final QueuedEvent qeuedEvent = buffer.poll();
+ final QueuedEvent queuedEvent = buffer.poll();
// Nothing queued, release the thread
- if (qeuedEvent == null) {
+ if (queuedEvent == null) {
break;
}
- final OutboundSseEvent event = qeuedEvent.event;
- final CompletableFuture<?> future = qeuedEvent.completion;
+ final OutboundSseEvent event = queuedEvent.event;
+ final CompletableFuture<?> future = queuedEvent.completion;
try {
- writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
- event.getMediaType(), null, ctx.getResponse().getOutputStream());
- ctx.getResponse().flushBuffer();
- future.complete(null);
+ if (error == null) {
+ LOG.fine("Dispatching SSE event over the wire");
+
+ writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+ event.getMediaType(), null, ctx.getResponse().getOutputStream());
+ ctx.getResponse().flushBuffer();
+
+ LOG.fine("Completing the future successfully");
+ future.complete(null);
+ } else {
+ LOG.fine("Completing the future unsuccessfully (error enountered previously)");
+ future.completeExceptionally(error);
+ }
} catch (final Exception ex) {
+ // Very likely the connection is closed by the client (but we cannot
+ // detect if for sure, container-specific).
+ if (ex instanceof IOException) {
+ error = (IOException)ex;
+ }
+
+ LOG.fine("Completing the future unsuccessfully (error enountered)");
future.completeExceptionally(ex);
}
}
} finally {
+ final boolean shouldComplete = (error != null) && throwable.compareAndSet(null, error);
dispatching.set(false);
+
+ // Ideally, we should be rethrowing the exception here (error) and handle
+ // it inside the onError() callback. However, most of the servlet containers
+ // do not handle this case properly (and onError() is not called).
+ if (shouldComplete && completed.compareAndSet(false, true)) {
+ LOG.warning("Prematurely completing the AsyncContext due to error encountered: " + error);
+ // In case of Tomcat, the context is closed automatically when client closes
+ // the connection and onError callback will be called (in this case request
+ // is set to null).
+ if (ctx.getRequest() != null) {
+ LOG.fine("Completing the AsyncContext");
+ try {
+ ctx.complete();
+ } catch (final IllegalStateException ex) {
+ LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+ }
+ }
+ }
}
}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 8e02e71..c5a2b0e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -28,11 +28,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.sse.InboundSseEvent;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.jaxrs.client.ClientProviderFactory;
@@ -44,6 +46,7 @@ public class InboundSseEventProcessor {
public static final String SERVER_SENT_EVENTS = "text/event-stream";
public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
+ private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
private static final String COMMENT = ": ";
private static final String EVENT = "event: ";
private static final String ID = "id: ";
@@ -116,6 +119,7 @@ public class InboundSseEventProcessor {
}
if (response != null) {
+ LOG.fine("Closing the response");
response.close();
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 1e32b4b..5d6b8f4 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
public abstract class AbstractSseTest extends AbstractSseBaseTest {
@@ -200,6 +201,71 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
);
}
+ @Test
+ public void testClientClosesEventSource() throws InterruptedException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/client-closes-connection/sse/0");
+ final Collection<Book> books = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collect(books), System.out::println);
+ eventSource.open();
+
+ // wait for single event, close before server sends other 3
+ awaitEvents(200, books, 1);
+
+ // Only two out of 4 messages should be delivered, others should be discarded
+ final Response r =
+ createWebClient("/rest/api/bookstore/client-closes-connection/received", MediaType.APPLICATION_JSON)
+ .put(null);
+ assertThat(r.getStatus(), equalTo(204));
+
+ assertThat(eventSource.close(1, TimeUnit.SECONDS), equalTo(true));
+ }
+
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertThat(books,
+ hasItems(
+ new Book("New Book #1", 1)
+ )
+ );
+
+ // Only two out of 4 messages should be delivered, others should be discarded
+ final Response r =
+ createWebClient("/rest/api/bookstore/client-closes-connection/closed", MediaType.APPLICATION_JSON)
+ .put(null);
+ assertThat(r.getStatus(), equalTo(204));
+
+ // Give server some time to finish up the sink
+ Thread.sleep(2000);
+
+ // Only two out of 4 messages should be delivered, others should be discarded
+ final BookBroadcasterStats stats =
+ createWebClient("/rest/api/bookstore/client-closes-connection/stats", MediaType.APPLICATION_JSON)
+ .get()
+ .readEntity(BookBroadcasterStats.class);
+
+ // Tomcat will feedback through onError callback, others through onComplete
+ assertThat(stats.isErrored(), equalTo(supportsErrorPropagation()));
+ // The sink should be in closed state
+ assertThat(stats.isWasClosed(), equalTo(true));
+ // The onClose callback should be called
+ assertThat(stats.isClosed(), equalTo(true));
+
+ // It is very hard to get the predictable match here, but at most
+ // 2 events could get through before the client's connection drop off
+ assertTrue(stats.getCompleted() == 2 || stats.getCompleted() == 1);
+ }
+
+ /**
+ * Jetty / Undertow do not propagate errors from the runnable passed to
+ * AsyncContext::start() up to the AsyncEventListener::onError(). Tomcat however
+ * does it.
+ * @return
+ */
+ protected boolean supportsErrorPropagation() {
+ return false;
+ }
+
private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java
new file mode 100644
index 0000000..d82a8a5
--- /dev/null
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookBroadcasterStats.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jaxrs.sse;
+
+public class BookBroadcasterStats {
+ private int completed;
+ private boolean closed;
+ private boolean errored;
+ private boolean wasClosed;
+
+ public synchronized void inc() {
+ setCompleted(getCompleted() + 1);
+ }
+
+ public synchronized void reset() {
+ setCompleted(0);
+ setClosed(false);
+ setErrored(false);
+ setWasClosed(false);
+ }
+
+ public synchronized void closed() {
+ setClosed(true);
+ }
+
+ public synchronized void errored() {
+ setErrored(true);
+ }
+
+ public int getCompleted() {
+ return completed;
+ }
+
+ public void setCompleted(int completed) {
+ this.completed = completed;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void setClosed(boolean closed) {
+ this.closed = closed;
+ }
+
+ public boolean isErrored() {
+ return errored;
+ }
+
+ public void setErrored(boolean errored) {
+ this.errored = errored;
+ }
+
+ public boolean isWasClosed() {
+ return wasClosed;
+ }
+
+ public void setWasClosed(boolean wasClosed) {
+ this.wasClosed = wasClosed;
+ }
+}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index ad2387f..a32f731 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -34,7 +34,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.OutboundSseEvent.Builder;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
@@ -44,7 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Path("/api/bookstore")
-public class BookStore {
+public class BookStore extends BookStoreClientCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
private final CountDownLatch latch = new CountDownLatch(2);
@@ -150,12 +149,9 @@ public class BookStore {
LOG.error("Wait has been interrupted", ex);
}
}
-
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id(Integer.toString(eventId))
- .data(Book.class, new Book("New Book #" + eventId, eventId))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
+
+ @Override
+ protected Sse getSse() {
+ return sse;
}
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index f78c5ce..d4f0158 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -34,7 +34,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.OutboundSseEvent.Builder;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
@@ -44,7 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Path("/api/bookstore")
-public class BookStore2 {
+public class BookStore2 extends BookStoreClientCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BookStore2.class);
private final CountDownLatch latch = new CountDownLatch(2);
@@ -149,12 +148,9 @@ public class BookStore2 {
LOG.error("Wait has been interrupted", ex);
}
}
-
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id(Integer.toString(eventId))
- .data(Book.class, new Book("New Book #" + eventId, eventId))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
+
+ @Override
+ protected Sse getSse() {
+ return sse;
}
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
new file mode 100644
index 0000000..b9b599d
--- /dev/null
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jaxrs.sse;
+
+import java.util.concurrent.Phaser;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BookStoreClientCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
+
+ private final BookBroadcasterStats stats = new BookBroadcasterStats();
+ private final Phaser phaser = new Phaser(2);
+
+ protected abstract Sse getSse();
+
+ @GET
+ @Path("client-closes-connection/sse/{id}")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void clientCloseConnection(@Context SseEventSink sink, @PathParam("id") final String idIgnore,
+ @HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("0") final String lastEventId) {
+
+ stats.reset();
+ new Thread(() -> {
+ try {
+ final Integer id = Integer.valueOf(lastEventId);
+ final Builder builder = getSse().newEventBuilder();
+
+ SseBroadcaster localBroadcaster = getSse().newBroadcaster();
+ localBroadcaster.onError((sseEventSink, throwable) -> stats.errored());
+ localBroadcaster.onClose(sseEventSink -> stats.closed());
+ localBroadcaster.register(sink);
+
+ localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1))
+ .whenComplete((r, ex) -> stats.inc());
+
+ // Await client to confirm the it got the event (PUT /client-closes-connection/received)
+ phaser.arriveAndAwaitAdvance();
+
+ Thread.sleep(500);
+ localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2))
+ .whenComplete((r, ex) -> {
+ // we expect exception here
+ if (ex == null && !sink.isClosed()) {
+ stats.inc();
+ }
+ });
+
+ // Await client to confirm the connection is closed (PUT /client-closes-connection/closed)
+ phaser.arriveAndAwaitAdvance();
+
+ // This event should complete exceptionally since SseEventSource should be
+ // closed already.
+ Thread.sleep(500);
+ localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3))
+ .whenComplete((r, ex) -> {
+ // we expect exception here
+ if (ex == null && !sink.isClosed()) {
+ stats.inc();
+ }
+ });
+
+ // This event should complete immediately since the sink has been removed
+ // from the broadcaster (closed).
+ Thread.sleep(500);
+ localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4))
+ .whenComplete((r, ex) -> {
+ // we expect the sink to be closed at this point
+ if (ex != null || !sink.isClosed()) {
+ stats.inc();
+ }
+ });
+
+ stats.setWasClosed(sink.isClosed());
+ phaser.arriveAndDeregister();
+
+ sink.close();
+ } catch (final InterruptedException ex) {
+ LOG.error("Communication error", ex);
+ }
+ }
+ ).start();
+ }
+
+ @PUT
+ @Path("client-closes-connection/received")
+ @Produces(MediaType.APPLICATION_JSON)
+ public void received() {
+ phaser.arriveAndAwaitAdvance();
+ }
+
+ @PUT
+ @Path("client-closes-connection/closed")
+ @Produces(MediaType.APPLICATION_JSON)
+ public void closed() {
+ phaser.arriveAndDeregister();
+ }
+
+ @GET
+ @Path("client-closes-connection/stats")
+ @Produces(MediaType.APPLICATION_JSON)
+ public BookBroadcasterStats stats() {
+ return stats;
+ }
+
+ protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id(Integer.toString(eventId))
+ .data(Book.class, new Book("New Book #" + eventId, eventId))
+ .mediaType(MediaType.APPLICATION_JSON_TYPE)
+ .build();
+ }
+}
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
index 9621b6d..e655391 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatEmbeddedTest.java
@@ -47,4 +47,8 @@ public class TomcatEmbeddedTest extends AbstractSseTest {
return EmbeddedTomcatServer.PORT;
}
+ @Override
+ protected boolean supportsErrorPropagation() {
+ return true;
+ }
}
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
index c58fe4e..58f6194 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/TomcatWarTest.java
@@ -47,4 +47,8 @@ public class TomcatWarTest extends AbstractSseTest {
return EmbeddedTomcatServer.PORT;
}
+ @Override
+ protected boolean supportsErrorPropagation() {
+ return true;
+ }
}