You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/22 17:56:30 UTC
[cxf] branch 3.3.x-fixes updated: CXF-8249: SSE client refuses to
accept valid stream
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
new 1bd2d45 CXF-8249: SSE client refuses to accept valid stream
1bd2d45 is described below
commit 1bd2d4519fa86981d7277ca450b5090ce4c2af55
Author: reta <dr...@gmail.com>
AuthorDate: Sun Mar 22 12:58:18 2020 -0400
CXF-8249: SSE client refuses to accept valid stream
(cherry picked from commit 16cbb21aa3b2fb1cb0ad5c049ebe4a9771b11b7c)
---
parent/pom.xml | 5 +
.../jaxrs/sse/client/InboundSseEventProcessor.java | 50 ++++++--
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 28 ++++-
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 39 ++++--
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 39 ++++--
.../jaxrs/sse/BookStoreClientCloseable.java | 18 ++-
systests/spring-boot/pom.xml | 4 +
.../apache/cxf/systest/jaxrs/resources/Book.java | 19 +++
.../jaxrs/spring/boot/SpringSseEmitterTest.java | 132 +++++++++++++++++++++
9 files changed, 296 insertions(+), 38 deletions(-)
diff --git a/parent/pom.xml b/parent/pom.xml
index be5cf8d..e747132 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -825,6 +825,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-sse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-extension-providers</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 2412140..016282c 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -47,11 +47,11 @@ public class InboundSseEventProcessor {
public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
- private static final String COMMENT = ": ";
- private static final String EVENT = "event: ";
- private static final String ID = "id: ";
- private static final String RETRY = "retry: ";
- private static final String DATA = "data: ";
+ private static final String COMMENT = ":";
+ private static final String EVENT = "event:";
+ private static final String ID = "id:";
+ private static final String RETRY = "retry:";
+ private static final String DATA = "data:";
private final Endpoint endpoint;
private final InboundSseEventListener listener;
@@ -93,16 +93,23 @@ public class InboundSseEventProcessor {
builder = null; /* reset the builder for next event */
listener.onNext(event);
} else {
+ // Parsing and interpreting event stream:
+ // https://www.w3.org/TR/eventsource/#parsing-an-event-stream
if (line.startsWith(EVENT)) {
- builder = getOrCreate(builder).name(line.substring(EVENT.length()));
+ int beginIndex = findFirstNonSpacePosition(line, EVENT);
+ builder = getOrCreate(builder).name(line.substring(beginIndex));
} else if (line.startsWith(ID)) {
- builder = getOrCreate(builder).id(line.substring(ID.length()));
+ int beginIndex = findFirstNonSpacePosition(line, ID);
+ builder = getOrCreate(builder).id(line.substring(beginIndex));
} else if (line.startsWith(COMMENT)) {
- builder = getOrCreate(builder).comment(line.substring(COMMENT.length()));
+ int beginIndex = findFirstNonSpacePosition(line, COMMENT);
+ builder = getOrCreate(builder).comment(line.substring(beginIndex));
} else if (line.startsWith(RETRY)) {
- builder = getOrCreate(builder).reconnectDelay(line.substring(RETRY.length()));
+ int beginIndex = findFirstNonSpacePosition(line, RETRY);
+ builder = getOrCreate(builder).reconnectDelay(line.substring(beginIndex));
} else if (line.startsWith(DATA)) {
- builder = getOrCreate(builder).appendData(line.substring(DATA.length()));
+ int beginIndex = findFirstNonSpacePosition(line, DATA);
+ builder = getOrCreate(builder).appendData(line.substring(beginIndex));
}
}
line = reader.readLine();
@@ -156,4 +163,27 @@ public class InboundSseEventProcessor {
private static Builder getOrCreate(final Builder builder) {
return (builder == null) ? new InboundSseEventImpl.Builder() : builder;
}
+
+ /**
+ * Remove only leading spaces from the line as per specification, space after
+ * the colon is optional.
+ *
+ * The following stream fires two identical events:
+ *
+ * data:test
+ * data: test
+ *
+ * This is because the space after the colon is ignored if present.
+ */
+ private static int findFirstNonSpacePosition(final String str, final String prefix) {
+ int beginIndex = prefix.length();
+
+ for (; beginIndex < str.length(); ++beginIndex) {
+ if (str.charAt(beginIndex) != ' ') {
+ break;
+ }
+ }
+
+ return beginIndex;
+ }
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 370d39b..f6ad024 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -104,6 +104,28 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
}
@Test
+ public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
+ final Collection<String> titles = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collectRaw(titles), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ awaitEvents(5000, titles, 4);
+ }
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertThat(titles,
+ hasItems(
+ "New Book #1",
+ "New Book #2",
+ "New Book #3",
+ "New Book #4"
+ )
+ );
+ }
+
+ @Test
public void testNoDataIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/nodata");
final Collection<Book> books = new ArrayList<>();
@@ -313,7 +335,11 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
return false;
}
- private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+ private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
+
+ private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
+ return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
+ }
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 41ffd34..de28eef 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -78,13 +78,13 @@ public class BookStore extends BookStoreClientCloseable {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.send(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -102,11 +102,28 @@ public class BookStore extends BookStoreClientCloseable {
CompletableFuture
.runAsync(() -> {
- sink.send(createStatsEvent(builder.name("book"), 1));
- sink.send(createStatsEvent(builder.name("book"), 2));
- sink.send(createStatsEvent(builder.name("book"), 3));
- sink.send(createStatsEvent(builder.name("book"), 4));
- sink.send(createStatsEvent(builder.name("book"), 5));
+ sink.send(createEvent(builder.name("book"), 1));
+ sink.send(createEvent(builder.name("book"), 2));
+ sink.send(createEvent(builder.name("book"), 3));
+ sink.send(createEvent(builder.name("book"), 4));
+ sink.send(createEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
+
+ @GET
+ @Path("/titles/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookTitlesOnly(@Context SseEventSink sink) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createRawEvent(builder.name("book"), 1));
+ sink.send(createRawEvent(builder.name("book"), 2));
+ sink.send(createRawEvent(builder.name("book"), 3));
+ sink.send(createRawEvent(builder.name("book"), 4));
+ sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@@ -139,8 +156,8 @@ public class BookStore extends BookStoreClientCloseable {
}
final Builder builder = sse.newEventBuilder();
- broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
- .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+ broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+ .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
.whenComplete((r, ex) -> {
if (broadcaster != null) {
broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index d7abb04..a922d08 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -77,13 +77,13 @@ public class BookStore2 extends BookStoreClientCloseable {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.send(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -101,11 +101,28 @@ public class BookStore2 extends BookStoreClientCloseable {
CompletableFuture
.runAsync(() -> {
- sink.send(createStatsEvent(builder.name("book"), 1));
- sink.send(createStatsEvent(builder.name("book"), 2));
- sink.send(createStatsEvent(builder.name("book"), 3));
- sink.send(createStatsEvent(builder.name("book"), 4));
- sink.send(createStatsEvent(builder.name("book"), 5));
+ sink.send(createEvent(builder.name("book"), 1));
+ sink.send(createEvent(builder.name("book"), 2));
+ sink.send(createEvent(builder.name("book"), 3));
+ sink.send(createEvent(builder.name("book"), 4));
+ sink.send(createEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
+
+ @GET
+ @Path("/titles/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookTitlesOnly(@Context SseEventSink sink) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createRawEvent(builder.name("book"), 1));
+ sink.send(createRawEvent(builder.name("book"), 2));
+ sink.send(createRawEvent(builder.name("book"), 3));
+ sink.send(createRawEvent(builder.name("book"), 4));
+ sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@@ -138,8 +155,8 @@ public class BookStore2 extends BookStoreClientCloseable {
}
final Builder builder = sse.newEventBuilder();
- broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
- .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+ broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+ .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
.whenComplete((r, ex) -> {
if (broadcaster != null) {
broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
index b9b599d..2fb50a6 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
@@ -64,14 +64,14 @@ abstract class BookStoreClientCloseable {
localBroadcaster.onClose(sseEventSink -> stats.closed());
localBroadcaster.register(sink);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 1))
.whenComplete((r, ex) -> stats.inc());
// Await client to confirm the it got the event (PUT /client-closes-connection/received)
phaser.arriveAndAwaitAdvance();
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 2))
.whenComplete((r, ex) -> {
// we expect exception here
if (ex == null && !sink.isClosed()) {
@@ -85,7 +85,7 @@ abstract class BookStoreClientCloseable {
// This event should complete exceptionally since SseEventSource should be
// closed already.
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 3))
.whenComplete((r, ex) -> {
// we expect exception here
if (ex == null && !sink.isClosed()) {
@@ -96,7 +96,7 @@ abstract class BookStoreClientCloseable {
// This event should complete immediately since the sink has been removed
// from the broadcaster (closed).
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 4))
.whenComplete((r, ex) -> {
// we expect the sink to be closed at this point
if (ex != null || !sink.isClosed()) {
@@ -136,11 +136,19 @@ abstract class BookStoreClientCloseable {
return stats;
}
- protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id(Integer.toString(eventId))
.data(Book.class, new Book("New Book #" + eventId, eventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
+
+ protected static OutboundSseEvent createRawEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id(Integer.toString(eventId))
+ .data("New Book #" + eventId)
+ .mediaType(MediaType.TEXT_PLAIN_TYPE)
+ .build();
+ }
}
diff --git a/systests/spring-boot/pom.xml b/systests/spring-boot/pom.xml
index 18a1cdf..8f38e1a 100644
--- a/systests/spring-boot/pom.xml
+++ b/systests/spring-boot/pom.xml
@@ -103,6 +103,10 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-sse</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-service-description-openapi-v3</artifactId>
</dependency>
<dependency>
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
index 7fd0f20..7e2eeeb 100644
--- a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
@@ -19,6 +19,10 @@
package org.apache.cxf.systest.jaxrs.resources;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
public class Book {
private String title;
private String author;
@@ -46,4 +50,19 @@ public class Book {
public void setAuthor(String author) {
this.author = author;
}
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
}
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
new file mode 100644
index 0000000..ad2e7ad
--- /dev/null
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.spring.boot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.systest.jaxrs.resources.Book;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = SpringSseEmitterTest.LibraryController.class)
+public class SpringSseEmitterTest {
+ @LocalServerPort
+ private int port;
+
+ @RestController
+ @EnableAutoConfiguration
+ static class LibraryController {
+ @GetMapping("/sse")
+ public SseEmitter streamSseMvc() {
+ final SseEmitter emitter = new SseEmitter();
+ final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
+
+ sseMvcExecutor.execute(() -> {
+ try {
+ for (int eventId = 1; eventId <= 5; ++eventId) {
+ SseEventBuilder event = SseEmitter.event()
+ .id(Integer.toString(eventId))
+ .data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON)
+ .name("book");
+ emitter.send(event);
+ Thread.sleep(100);
+ }
+ } catch (Exception ex) {
+ emitter.completeWithError(ex);
+ }
+ });
+
+ return emitter;
+ }
+ }
+
+ @Test
+ public void testSseEvents() throws InterruptedException {
+ final WebTarget target = createWebTarget();
+ final Collection<Book> books = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collect(books), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ awaitEvents(5000, books, 5);
+ }
+
+ assertThat(books,
+ hasItems(
+ new Book("New Book #1", "Author #1"),
+ new Book("New Book #2", "Author #2"),
+ new Book("New Book #3", "Author #3"),
+ new Book("New Book #4", "Author #4"),
+ new Book("New Book #5", "Author #5")
+ )
+ );
+ }
+
+ private WebTarget createWebTarget() {
+ return ClientBuilder
+ .newClient()
+ .property("http.receive.timeout", 8000)
+ .register(JacksonJsonProvider.class)
+ .target("http://localhost:" + port + "/sse");
+ }
+
+ private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+ return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE));
+ }
+
+ private void awaitEvents(long timeout, final Collection<?> events, int size) throws InterruptedException {
+ final long sleep = timeout / 10;
+
+ for (int i = 0; i < timeout; i += sleep) {
+ if (events.size() == size) {
+ break;
+ } else {
+ Thread.sleep(sleep);
+ }
+ }
+ }
+}