You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/22 19:25:03 UTC
[cxf] 01/02: CXF-8249: SSE client refuses to accept valid stream
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 95a200a6467830eca7432bf9312c514206a4196f
Author: reta <dr...@gmail.com>
AuthorDate: Sun Mar 22 12:58:18 2020 -0400
CXF-8249: SSE client refuses to accept valid stream
(cherry picked from commit 16cbb21aa3b2fb1cb0ad5c049ebe4a9771b11b7c)
(cherry picked from commit 1bd2d4519fa86981d7277ca450b5090ce4c2af55)
# Conflicts:
# systests/spring-boot/pom.xml
# systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
---
parent/pom.xml | 5 +
.../jaxrs/sse/client/InboundSseEventProcessor.java | 50 ++++++--
systests/pom.xml | 1 +
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 28 ++++-
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 39 ++++--
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 39 ++++--
.../jaxrs/sse/BookStoreClientCloseable.java | 18 ++-
systests/spring-boot/pom.xml | 140 +++++++++++++++++++++
.../apache/cxf/systest/jaxrs/resources/Book.java | 68 ++++++++++
.../jaxrs/spring/boot/SpringSseEmitterTest.java | 132 +++++++++++++++++++
10 files changed, 482 insertions(+), 38 deletions(-)
diff --git a/parent/pom.xml b/parent/pom.xml
index 074ce8a..917fa54 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -817,6 +817,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-sse</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-extension-providers</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 2412140..016282c 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -47,11 +47,11 @@ public class InboundSseEventProcessor {
public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
private static final Logger LOG = LogUtils.getL7dLogger(InboundSseEventProcessor.class);
- private static final String COMMENT = ": ";
- private static final String EVENT = "event: ";
- private static final String ID = "id: ";
- private static final String RETRY = "retry: ";
- private static final String DATA = "data: ";
+ private static final String COMMENT = ":";
+ private static final String EVENT = "event:";
+ private static final String ID = "id:";
+ private static final String RETRY = "retry:";
+ private static final String DATA = "data:";
private final Endpoint endpoint;
private final InboundSseEventListener listener;
@@ -93,16 +93,23 @@ public class InboundSseEventProcessor {
builder = null; /* reset the builder for next event */
listener.onNext(event);
} else {
+ // Parsing and interpreting event stream:
+ // https://www.w3.org/TR/eventsource/#parsing-an-event-stream
if (line.startsWith(EVENT)) {
- builder = getOrCreate(builder).name(line.substring(EVENT.length()));
+ int beginIndex = findFirstNonSpacePosition(line, EVENT);
+ builder = getOrCreate(builder).name(line.substring(beginIndex));
} else if (line.startsWith(ID)) {
- builder = getOrCreate(builder).id(line.substring(ID.length()));
+ int beginIndex = findFirstNonSpacePosition(line, ID);
+ builder = getOrCreate(builder).id(line.substring(beginIndex));
} else if (line.startsWith(COMMENT)) {
- builder = getOrCreate(builder).comment(line.substring(COMMENT.length()));
+ int beginIndex = findFirstNonSpacePosition(line, COMMENT);
+ builder = getOrCreate(builder).comment(line.substring(beginIndex));
} else if (line.startsWith(RETRY)) {
- builder = getOrCreate(builder).reconnectDelay(line.substring(RETRY.length()));
+ int beginIndex = findFirstNonSpacePosition(line, RETRY);
+ builder = getOrCreate(builder).reconnectDelay(line.substring(beginIndex));
} else if (line.startsWith(DATA)) {
- builder = getOrCreate(builder).appendData(line.substring(DATA.length()));
+ int beginIndex = findFirstNonSpacePosition(line, DATA);
+ builder = getOrCreate(builder).appendData(line.substring(beginIndex));
}
}
line = reader.readLine();
@@ -156,4 +163,27 @@ public class InboundSseEventProcessor {
private static Builder getOrCreate(final Builder builder) {
return (builder == null) ? new InboundSseEventImpl.Builder() : builder;
}
+
+ /**
+ * Remove only leading spaces from the line as per specification, space after
+ * the colon is optional.
+ *
+ * The following stream fires two identical events:
+ *
+ * data:test
+ * data: test
+ *
+ * This is because the space after the colon is ignored if present.
+ */
+ private static int findFirstNonSpacePosition(final String str, final String prefix) {
+ int beginIndex = prefix.length();
+
+ for (; beginIndex < str.length(); ++beginIndex) {
+ if (str.charAt(beginIndex) != ' ') {
+ break;
+ }
+ }
+
+ return beginIndex;
+ }
}
diff --git a/systests/pom.xml b/systests/pom.xml
index ac5e163..9952c00 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -54,5 +54,6 @@
<module>ws-transfer</module>
<module>rs-sse</module>
<module>microprofile</module>
+ <module>spring-boot</module>
</modules>
</project>
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 0e1158a..87c130a 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -101,6 +101,28 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
}
@Test
+ public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
+ final Collection<String> titles = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collectRaw(titles), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ awaitEvents(5000, titles, 4);
+ }
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertThat(titles,
+ hasItems(
+ "New Book #1",
+ "New Book #2",
+ "New Book #3",
+ "New Book #4"
+ )
+ );
+ }
+
+ @Test
public void testNoDataIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/nodata");
final Collection<Book> books = new ArrayList<>();
@@ -310,7 +332,11 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
return false;
}
- private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+ private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
+
+ private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
+ return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
+ }
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 41ffd34..de28eef 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -78,13 +78,13 @@ public class BookStore extends BookStoreClientCloseable {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.send(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -102,11 +102,28 @@ public class BookStore extends BookStoreClientCloseable {
CompletableFuture
.runAsync(() -> {
- sink.send(createStatsEvent(builder.name("book"), 1));
- sink.send(createStatsEvent(builder.name("book"), 2));
- sink.send(createStatsEvent(builder.name("book"), 3));
- sink.send(createStatsEvent(builder.name("book"), 4));
- sink.send(createStatsEvent(builder.name("book"), 5));
+ sink.send(createEvent(builder.name("book"), 1));
+ sink.send(createEvent(builder.name("book"), 2));
+ sink.send(createEvent(builder.name("book"), 3));
+ sink.send(createEvent(builder.name("book"), 4));
+ sink.send(createEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
+
+ @GET
+ @Path("/titles/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookTitlesOnly(@Context SseEventSink sink) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createRawEvent(builder.name("book"), 1));
+ sink.send(createRawEvent(builder.name("book"), 2));
+ sink.send(createRawEvent(builder.name("book"), 3));
+ sink.send(createRawEvent(builder.name("book"), 4));
+ sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@@ -139,8 +156,8 @@ public class BookStore extends BookStoreClientCloseable {
}
final Builder builder = sse.newEventBuilder();
- broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
- .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+ broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+ .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
.whenComplete((r, ex) -> {
if (broadcaster != null) {
broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index d7abb04..a922d08 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -77,13 +77,13 @@ public class BookStore2 extends BookStoreClientCloseable {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.send(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.send(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -101,11 +101,28 @@ public class BookStore2 extends BookStoreClientCloseable {
CompletableFuture
.runAsync(() -> {
- sink.send(createStatsEvent(builder.name("book"), 1));
- sink.send(createStatsEvent(builder.name("book"), 2));
- sink.send(createStatsEvent(builder.name("book"), 3));
- sink.send(createStatsEvent(builder.name("book"), 4));
- sink.send(createStatsEvent(builder.name("book"), 5));
+ sink.send(createEvent(builder.name("book"), 1));
+ sink.send(createEvent(builder.name("book"), 2));
+ sink.send(createEvent(builder.name("book"), 3));
+ sink.send(createEvent(builder.name("book"), 4));
+ sink.send(createEvent(builder.name("book"), 5));
+ })
+ .whenComplete((r, ex) -> sink.close());
+ }
+
+ @GET
+ @Path("/titles/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void forBookTitlesOnly(@Context SseEventSink sink) {
+ final Builder builder = sse.newEventBuilder();
+
+ CompletableFuture
+ .runAsync(() -> {
+ sink.send(createRawEvent(builder.name("book"), 1));
+ sink.send(createRawEvent(builder.name("book"), 2));
+ sink.send(createRawEvent(builder.name("book"), 3));
+ sink.send(createRawEvent(builder.name("book"), 4));
+ sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@@ -138,8 +155,8 @@ public class BookStore2 extends BookStoreClientCloseable {
}
final Builder builder = sse.newEventBuilder();
- broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000))
- .thenAcceptBoth(broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000)), (a, b) -> { })
+ broadcaster.broadcast(createEvent(builder.name("book"), 1000))
+ .thenAcceptBoth(broadcaster.broadcast(createEvent(builder.name("book"), 2000)), (a, b) -> { })
.whenComplete((r, ex) -> {
if (broadcaster != null) {
broadcaster.close();
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
index b9b599d..2fb50a6 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreClientCloseable.java
@@ -64,14 +64,14 @@ abstract class BookStoreClientCloseable {
localBroadcaster.onClose(sseEventSink -> stats.closed());
localBroadcaster.register(sink);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 1))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 1))
.whenComplete((r, ex) -> stats.inc());
// Await client to confirm the it got the event (PUT /client-closes-connection/received)
phaser.arriveAndAwaitAdvance();
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 2))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 2))
.whenComplete((r, ex) -> {
// we expect exception here
if (ex == null && !sink.isClosed()) {
@@ -85,7 +85,7 @@ abstract class BookStoreClientCloseable {
// This event should complete exceptionally since SseEventSource should be
// closed already.
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 3))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 3))
.whenComplete((r, ex) -> {
// we expect exception here
if (ex == null && !sink.isClosed()) {
@@ -96,7 +96,7 @@ abstract class BookStoreClientCloseable {
// This event should complete immediately since the sink has been removed
// from the broadcaster (closed).
Thread.sleep(500);
- localBroadcaster.broadcast(createStatsEvent(builder.name("book"), id + 4))
+ localBroadcaster.broadcast(createEvent(builder.name("book"), id + 4))
.whenComplete((r, ex) -> {
// we expect the sink to be closed at this point
if (ex != null || !sink.isClosed()) {
@@ -136,11 +136,19 @@ abstract class BookStoreClientCloseable {
return stats;
}
- protected static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id(Integer.toString(eventId))
.data(Book.class, new Book("New Book #" + eventId, eventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
+
+ protected static OutboundSseEvent createRawEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id(Integer.toString(eventId))
+ .data("New Book #" + eventId)
+ .mediaType(MediaType.TEXT_PLAIN_TYPE)
+ .build();
+ }
}
diff --git a/systests/spring-boot/pom.xml b/systests/spring-boot/pom.xml
new file mode 100644
index 0000000..4196238
--- /dev/null
+++ b/systests/spring-boot/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>cxf-parent</artifactId>
+ <groupId>org.apache.cxf</groupId>
+ <version>3.2.13-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cxf.systests</groupId>
+ <artifactId>cxf-systests-spring-boot</artifactId>
+ <name>Apache CXF Spring Boot Integration System Tests</name>
+ <description>Apache CXF Spring Boot Integration System Tests</description>
+ <url>https://cxf.apache.org</url>
+
+ <properties>
+ <cxf.module.name>org.apache.cxf.systests.spring.boot</cxf.module.name>
+ </properties>
+
+ <build>
+ <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>src/test/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </testResource>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Automatic-Module-Name>${cxf.module.name}.tests</Automatic-Module-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <version>2.0.1.Final</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-spring-boot-starter-jaxrs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-sse</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-service-description-openapi-v3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-testutils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <version>${cxf.spring.boot.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.xmlunit</groupId>
+ <artifactId>xmlunit-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
new file mode 100644
index 0000000..0dc4165
--- /dev/null
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/resources/Book.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.resources;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class Book {
+ private String title;
+ private String author;
+
+ public Book() {
+ }
+
+ public Book(final String title, final String author) {
+ this.title = title;
+ this.author = author;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getAuthor() {
+ return author;
+ }
+
+ public void setAuthor(String author) {
+ this.author = author;
+ }
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+}
diff --git a/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
new file mode 100644
index 0000000..e4c10cc
--- /dev/null
+++ b/systests/spring-boot/src/test/java/org/apache/cxf/systest/jaxrs/spring/boot/SpringSseEmitterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.spring.boot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.systest.jaxrs.resources.Book;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.context.embedded.LocalServerPort;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = SpringSseEmitterTest.LibraryController.class)
+public class SpringSseEmitterTest {
+ @LocalServerPort
+ private int port;
+
+ @RestController
+ @EnableAutoConfiguration
+ static class LibraryController {
+ @GetMapping("/sse")
+ public SseEmitter streamSseMvc() {
+ final SseEmitter emitter = new SseEmitter();
+ final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
+
+ sseMvcExecutor.execute(() -> {
+ try {
+ for (int eventId = 1; eventId <= 5; ++eventId) {
+ SseEventBuilder event = SseEmitter.event()
+ .id(Integer.toString(eventId))
+ .data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON)
+ .name("book");
+ emitter.send(event);
+ Thread.sleep(100);
+ }
+ } catch (Exception ex) {
+ emitter.completeWithError(ex);
+ }
+ });
+
+ return emitter;
+ }
+ }
+
+ @Test
+ public void testSseEvents() throws InterruptedException {
+ final WebTarget target = createWebTarget();
+ final Collection<Book> books = new ArrayList<>();
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collect(books), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ awaitEvents(5000, books, 5);
+ }
+
+ assertThat(books,
+ hasItems(
+ new Book("New Book #1", "Author #1"),
+ new Book("New Book #2", "Author #2"),
+ new Book("New Book #3", "Author #3"),
+ new Book("New Book #4", "Author #4"),
+ new Book("New Book #5", "Author #5")
+ )
+ );
+ }
+
+ private WebTarget createWebTarget() {
+ return ClientBuilder
+ .newClient()
+ .property("http.receive.timeout", 8000)
+ .register(JacksonJsonProvider.class)
+ .target("http://localhost:" + port + "/sse");
+ }
+
+ private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
+ return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE));
+ }
+
+ private void awaitEvents(long timeout, final Collection<?> events, int size) throws InterruptedException {
+ final long sleep = timeout / 10;
+
+ for (int i = 0; i < timeout; i += sleep) {
+ if (events.size() == size) {
+ break;
+ } else {
+ Thread.sleep(sleep);
+ }
+ }
+ }
+}