You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/22 17:56:30 UTC

[cxf] branch 3.3.x-fixes updated: CXF-8249: SSE client refuses to accept valid stream

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
     new 1bd2d45  CXF-8249: SSE client refuses to accept valid stream
1bd2d45 is described below

commit 1bd2d4519fa86981d7277ca450b5090ce4c2af55
Author: reta <dr...@gmail.com>
AuthorDate: Sun Mar 22 12:58:18 2020 -0400

    CXF-8249: SSE client refuses to accept valid stream
    
    (cherry picked from commit 16cbb21aa3b2fb1cb0ad5c049ebe4a9771b11b7c)
---
 parent/pom.xml                                     |   5 +
 .../jaxrs/sse/client/InboundSseEventProcessor.java |  50 ++++++--
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java     |  28 ++++-
 .../apache/cxf/systest/jaxrs/sse/BookStore.java    |  39 ++++--
 .../apache/cxf/systest/jaxrs/sse/BookStore2.java   |  39 ++++--
 .../jaxrs/sse/BookStoreClientCloseable.java        |  18 ++-
 systests/spring-boot/pom.xml                       |   4 +
 .../apache/cxf/systest/jaxrs/resources/Book.java   |  19 +++
 .../jaxrs/spring/boot/SpringSseEmitterTest.java    | 132 +++++++++++++++++++++
 9 files changed, 296 insertions(+), 38 deletions(-)

diff --git a/parent/pom.xml b/parent/pom.xml
index be5cf8d..e747132 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -825,6 +825,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.cxf</groupId>
+                <artifactId>cxf-rt-rs-sse</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.cxf</groupId>
                 <artifactId>cxf-rt-rs-extension-providers</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 2412140..016282c 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -47,11 +47,11 @@ public class InboundSseEventProcessor {
     public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
 
     private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
-    private static final String COMMENT = ": ";
-    private static final String EVENT = "event: ";
-    private static final String ID = "id: ";
-    private static final String RETRY = "retry: ";
-    private static final String DATA = "data: ";
+    private static final String COMMENT = ":";
+    private static final String EVENT = "event:";
+    private static final String ID = "id:";
+    private static final String RETRY = "retry:";
+    private static final String DATA = "data:";
 
     private final Endpoint endpoint;
     private final InboundSseEventListener listener;
@@ -93,16 +93,23 @@ public class InboundSseEventProcessor {
                         builder = null; /* reset the builder for next event */
                         listener.onNext(event);
                     } else {
+                        // Parsing and interpreting event stream: 
+                        // https://www.w3.org/TR/eventsource/#parsing-an-event-stream
                         if (line.startsWith(EVENT)) {
-                            builder = getOrCreate(builder).name(line.substring(EVENT.length()));
+                            int beginIndex = findFirstNonSpacePosition(line, EVENT);
+                            builder = getOrCreate(builder).name(line.substring(beginIndex));
                         } else if (line.startsWith(ID)) {
-                            builder = getOrCreate(builder).id(line.substring(ID.length()));
+                            int beginIndex = findFirstNonSpacePosition(line, ID);
+                            builder = getOrCreate(builder).id(line.substring(beginIndex));
                         } else if (line.startsWith(COMMENT)) {
-                            builder = getOrCreate(builder).comment(line.substring(COMMENT.length()));
+                            int beginIndex = findFirstNonSpacePosition(line, COMMENT);
+                            builder = getOrCreate(builder).comment(line.substring(beginIndex));
                         } else if (line.startsWith(RETRY)) {
-                            builder = getOrCreate(builder).reconnectDelay(line.substring(RETRY.length()));
+                            int beginIndex = findFirstNonSpacePosition(line, RETRY);
+                            builder = getOrCreate(builder).reconnectDelay(line.substring(beginIndex));
                         } else if (line.startsWith(DATA)) {
-                            builder = getOrCreate(builder).appendData(line.substring(DATA.length()));
+                            int beginIndex = findFirstNonSpacePosition(line, DATA);
+                            builder = getOrCreate(builder).appendData(line.substring(beginIndex));
                         }
                     }
                     line = reader.readLine();
@@ -156,4 +163,27 @@ public class InboundSseEventProcessor {
     private static Builder getOrCreate(final Builder builder) {
         return (builder == null) ? new InboundSseEventImpl.Builder() : builder;
     }
+    
+    /**
+     * Remove only leading spaces from the line as per specification, space after 
+     * the colon is optional.
+     * 
+     * The following stream fires two identical events:
+     * 
+     *   data:test
+     *   data: test
+     *   
+     *   This is because the space after the colon is ignored if present.
+     */
+    private static int findFirstNonSpacePosition(final String str, final String prefix) {
+        int beginIndex = prefix.length();
+        
+        for (; beginIndex < str.length(); ++beginIndex) {
+            if (str.charAt(beginIndex) != ' ') {
+                break;
+            }
+        }
+        
+        return beginIndex;
+    }
 }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 370d39b..f6ad024 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -104,6 +104,28 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
     }
 
     @Test
+    public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
+        final Collection<String> titles = new ArrayList<>();
+
+        try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collectRaw(titles), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(5000, titles, 4);
+        }
+        // Easing the test verification here, it does not work well for Atm + Jetty
+        assertThat(titles,
+            hasItems(
+                "New Book #1",
+                "New Book #2",
+                "New Book #3",
+                "New Book #4"
+            )
+        );
+    }
+
+    @Test
     public void testNoDataIsReturnedFromInboundSseEvents() throws InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/nodata");
         final Collection<Book> books = new ArrayList<>();
@@ -313,7 +335,11 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
         return false;
     }
 
-    private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+    private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
         return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
     }
+    
+    private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
+        return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
+    }
 }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 41ffd34..de28eef 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -78,13 +78,13 @@ public class BookStore extends BookStoreClientCloseable {
                     final Integer id = Integer.valueOf(lastEventId);
                     final Builder builder = sse.newEventBuilder();
 
-                    sink.send(createStatsEvent(builder.name("book"), id + 1));
+                    sink.send(createEvent(builder.name("book"), id + 1));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 2));
+                    sink.send(createEvent(builder.name("book"), id + 2));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 3));
+                    sink.send(createEvent(builder.name("book"), id + 3));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 4));
+                    sink.send(createEvent(builder.name("book"), id + 4));
                     Thread.sleep(200);
                     sink.close();
                 } catch (final InterruptedException ex) {
@@ -102,11 +102,28 @@ public class BookStore extends BookStoreClientCloseable {
         
         CompletableFuture
             .runAsync(() -> {
-                sink.send(createStatsEvent(builder.name("book"), 1));
-                sink.send(createStatsEvent(builder.name("book"), 2));
-                sink.send(createStatsEvent(builder.name("book"), 3));
-                sink.send(createStatsEvent(builder.name("book"), 4));
-                sink.send(createStatsEvent(builder.name("book"), 5));
+                sink.send(createEvent(builder.name("book"), 1));
+                sink.send(createEvent(builder.name("book"), 2));
+                sink.send(createEvent(builder.name("book"), 3));
+                sink.send(createEvent(builder.name("book"), 4));
+                sink.send(createEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
+
+    @GET
+    @Path("/titles/sse")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookTitlesOnly(@Context SseEventSink sink) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createRawEvent(builder.name("book"), 1));
+                sink.send(createRawEvent(builder.name("book"), 2));
+                sink.send(createRawEvent(builder.name("book"), 3));
+                sink.send(createRawEvent(builder.name("book"), 4));
+                sink.send(createRawEvent(builder.name("book"), 5));
             })
             .whenComplete((r, ex) -> sink.close());
     }
@@ -139,8 +156,8 @@ public class BookStore extends BookStoreClientCloseable {
             }
 
             final Builder builder = sse.newEventBuilder();
-            broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
-                .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+            broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+                .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
                 .whenComplete((r, ex) -> { 
                     if (broadcaster != null) {
                         broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index d7abb04..a922d08 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -77,13 +77,13 @@ public class BookStore2 extends BookStoreClientCloseable {
                     final Integer id = Integer.valueOf(lastEventId);
                     final Builder builder = sse.newEventBuilder();
 
-                    sink.send(createStatsEvent(builder.name("book"), id + 1));
+                    sink.send(createEvent(builder.name("book"), id + 1));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 2));
+                    sink.send(createEvent(builder.name("book"), id + 2));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 3));
+                    sink.send(createEvent(builder.name("book"), id + 3));
                     Thread.sleep(200);
-                    sink.send(createStatsEvent(builder.name("book"), id + 4));
+                    sink.send(createEvent(builder.name("book"), id + 4));
                     Thread.sleep(200);
                     sink.close();
                 } catch (final InterruptedException ex) {
@@ -101,11 +101,28 @@ public class BookStore2 extends BookStoreClientCloseable {
         
         CompletableFuture
             .runAsync(() -> {
-                sink.send(createStatsEvent(builder.name("book"), 1));
-                sink.send(createStatsEvent(builder.name("book"), 2));
-                sink.send(createStatsEvent(builder.name("book"), 3));
-                sink.send(createStatsEvent(builder.name("book"), 4));
-                sink.send(createStatsEvent(builder.name("book"), 5));
+                sink.send(createEvent(builder.name("book"), 1));
+                sink.send(createEvent(builder.name("book"), 2));
+                sink.send(createEvent(builder.name("book"), 3));
+                sink.send(createEvent(builder.name("book"), 4));
+                sink.send(createEvent(builder.name("book"), 5));
+            })
+            .whenComplete((r, ex) -> sink.close());
+    }
+
+    @GET
+    @Path("/titles/sse")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    public void forBookTitlesOnly(@Context SseEventSink sink) {
+        final Builder builder = sse.newEventBuilder();
+        
+        CompletableFuture
+            .runAsync(() -> {
+                sink.send(createRawEvent(builder.name("book"), 1));
+                sink.send(createRawEvent(builder.name("book"), 2));
+                sink.send(createRawEvent(builder.name("book"), 3));
+                sink.send(createRawEvent(builder.name("book"), 4));
+                sink.send(createRawEvent(builder.name("book"), 5));
             })
             .whenComplete((r, ex) -> sink.close());
     }
@@ -138,8 +155,8 @@ public class BookStore2 extends BookStoreClientCloseable {
             }
 
             final Builder builder = sse.newEventBuilder();
-            broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
-                .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+            broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+                .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
                 .whenComplete((r, ex) -> { 
                     if (broadcaster != null) {
                         broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
index b9b599d..2fb50a6 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
@@ -64,14 +64,14 @@ abstract class BookStoreClientCloseable {
                 localBroadcaster.onClose(sseEventSink -> stats.closed());
                 localBroadcaster.register(sink);
 
-                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1))
+                localBroadcaster.broadcast(createEvent(builder.name("book"), id + 1))
                     .whenComplete((r, ex) -> stats.inc());
                 
                 // Await client to confirm the it got the event (PUT /client-closes-connection/received)
                 phaser.arriveAndAwaitAdvance();
                 
                 Thread.sleep(500);
-                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2))
+                localBroadcaster.broadcast(createEvent(builder.name("book"), id + 2))
                     .whenComplete((r, ex) -> { 
                         // we expect exception here
                         if (ex == null && !sink.isClosed()) {
@@ -85,7 +85,7 @@ abstract class BookStoreClientCloseable {
                 // This event should complete exceptionally since SseEventSource should be 
                 // closed already.
                 Thread.sleep(500);
-                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3))
+                localBroadcaster.broadcast(createEvent(builder.name("book"), id + 3))
                     .whenComplete((r, ex) -> { 
                         // we expect exception here
                         if (ex == null && !sink.isClosed()) {
@@ -96,7 +96,7 @@ abstract class BookStoreClientCloseable {
                 // This event should complete immediately since the sink has been removed
                 // from the broadcaster (closed).
                 Thread.sleep(500);
-                localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4))
+                localBroadcaster.broadcast(createEvent(builder.name("book"), id + 4))
                     .whenComplete((r, ex) -> {
                         // we expect the sink to be closed at this point
                         if (ex != null || !sink.isClosed()) {
@@ -136,11 +136,19 @@ abstract class BookStoreClientCloseable {
         return stats;
     }
     
-    protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+    protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) {
         return builder
             .id(Integer.toString(eventId))
             .data(Book.class, new Book("New Book #" + eventId, eventId))
             .mediaType(MediaType.APPLICATION_JSON_TYPE)
             .build();
     }
+    
+    protected static OutboundSseEvent createRawEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+        return builder
+            .id(Integer.toString(eventId))
+            .data("New Book #" + eventId)
+            .mediaType(MediaType.TEXT_PLAIN_TYPE)
+            .build();
+    }
 }
diff --git a/systests/spring-boot/pom.xml b/systests/spring-boot/pom.xml
index 18a1cdf..8f38e1a 100644
--- a/systests/spring-boot/pom.xml
+++ b/systests/spring-boot/pom.xml
@@ -103,6 +103,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-sse</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-service-description-openapi-v3</artifactId>
         </dependency>
         <dependency>
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
index 7fd0f20..7e2eeeb 100644
--- a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
@@ -19,6 +19,10 @@
 
 package org.apache.cxf.systest.jaxrs.resources;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
 public class Book {
     private String title;
     private String author;
@@ -46,4 +50,19 @@ public class Book {
     public void setAuthor(String author) {
         this.author = author;
     }
+    
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return EqualsBuilder.reflectionEquals(this, obj);
+    }
+    
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
 }
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
new file mode 100644
index 0000000..ad2e7ad
--- /dev/null
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.spring.boot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.systest.jaxrs.resources.Book;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = SpringSseEmitterTest.LibraryController.class)
+public class SpringSseEmitterTest {
+    @LocalServerPort
+    private int port;
+    
+    @RestController
+    @EnableAutoConfiguration
+    static class LibraryController {
+        @GetMapping("/sse")
+        public SseEmitter streamSseMvc() {
+            final SseEmitter emitter = new SseEmitter();
+            final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
+            
+            sseMvcExecutor.execute(() -> {
+                try {
+                    for (int eventId = 1; eventId <= 5; ++eventId) {
+                        SseEventBuilder event = SseEmitter.event()
+                            .id(Integer.toString(eventId))
+                            .data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON)
+                            .name("book");
+                        emitter.send(event);
+                        Thread.sleep(100);
+                    }
+                } catch (Exception ex) {
+                    emitter.completeWithError(ex);
+                }
+            });
+            
+            return emitter;
+        }
+    }
+    
+    @Test
+    public void testSseEvents() throws InterruptedException {
+        final WebTarget target = createWebTarget();
+        final Collection<Book> books = new ArrayList<>();
+
+        try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+            eventSource.register(collect(books), System.out::println);
+            eventSource.open();
+            // Give the SSE stream some time to collect all events
+            awaitEvents(5000, books, 5);
+        }
+
+        assertThat(books,
+            hasItems(
+                new Book("New Book #1", "Author #1"),
+                new Book("New Book #2", "Author #2"),
+                new Book("New Book #3", "Author #3"),
+                new Book("New Book #4", "Author #4"),
+                new Book("New Book #5", "Author #5")
+            )
+        );
+    }
+    
+    private WebTarget createWebTarget() {
+        return ClientBuilder
+            .newClient()
+            .property("http.receive.timeout", 8000)
+            .register(JacksonJsonProvider.class)
+            .target("http://localhost:" + port + "/sse");
+    }
+
+    private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+        return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE));
+    }
+    
+    private void awaitEvents(long timeout, final Collection<?> events, int size) throws InterruptedException {
+        final long sleep = timeout / 10;
+        
+        for (int i = 0; i < timeout; i += sleep) {
+            if (events.size() == size) {
+                break;
+            } else {
+                Thread.sleep(sleep);
+            }
+        }
+    }
+}