You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/06/28 22:53:20 UTC
[cxf] branch master updated: [CXF-8544] JAX-RS server-side SSE
using POST (#812)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 422f5ba [CXF-8544] JAX-RS server-side SSE using POST (#812)
422f5ba is described below
commit 422f5ba220c7516302342655d438e38107ea11e1
Author: Tim Ward <ti...@apache.org>
AuthorDate: Mon Jun 28 23:53:10 2021 +0100
[CXF-8544] JAX-RS server-side SSE using POST (#812)
Create a test to ensure SSE works for POST resource methods.
N.B. This was *not* the case until CXF-8559 was fixed.
---
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 71 ++++++++++++++++++++++
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 29 +++++++++
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 29 +++++++++
3 files changed, 129 insertions(+)
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 8a82b21..e02a001 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -18,6 +18,12 @@
*/
package org.apache.cxf.systest.jaxrs.sse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -26,16 +32,19 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.sse.SseEventSource.Builder;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.junit.Before;
import org.junit.Test;
@@ -45,7 +54,10 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public abstract class AbstractSseTest extends AbstractSseBaseTest {
@Before
@@ -103,6 +115,65 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBooksStreamIsReturnedFromInboundSseEventsWithPOST() throws InterruptedException, IOException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
+ final Collection<Book> books = new ArrayList<>();
+
+ @SuppressWarnings("rawtypes")
+ MessageBodyReader mbr = new JacksonJsonProvider();
+
+ Response response = target.request(MediaType.SERVER_SENT_EVENTS)
+ .post(Entity.entity(42, MediaType.TEXT_PLAIN));
+
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class)))) {
+ String s;
+ Integer id = null;
+ Book book = null;
+
+ while ((s = br.readLine()) != null) {
+ if (s.trim().isEmpty()) {
+ if (id == null && book == null) {
+ continue;
+ } else if (id != null && book != null) {
+ books.add(book);
+ id = null;
+ book = null;
+ continue;
+ }
+ fail("The event did not contain both an id " + id + " and a book " + book);
+ }
+ if (s.startsWith("event:")) {
+ assertEquals("Not a book event", "event: book", s.trim());
+ continue;
+ }
+ if (s.startsWith("id:")) {
+ assertNull("There was an existing id " + id, id);
+ id = Integer.parseInt(s.substring(3).trim());
+ continue;
+ }
+ if (s.startsWith("data:")) {
+ assertNull("There was an existing book " + book, book);
+ book = (Book) mbr.readFrom(Book.class, Book.class, null, MediaType.APPLICATION_JSON_TYPE, null,
+ new ByteArrayInputStream(s.substring(5).trim().getBytes(StandardCharsets.UTF_8)));
+ continue;
+ }
+ fail("Unexpected String content returned by SSE POST " + s);
+ }
+ }
+
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertThat(books,
+ hasItems(
+ new Book("New Book #43", 43),
+ new Book("New Book #44", 44),
+ new Book("New Book #45", 45),
+ new Book("New Book #46", 46)
+ )
+ );
+ }
+
@Test
public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 7fa54c5..c6561bd 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
@@ -94,6 +95,34 @@ public class BookStore extends BookStoreClientCloseable {
}.start();
}
+ @POST
+ @Path("sse/{id}")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ @Consumes(MediaType.TEXT_PLAIN)
+ public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
+ final String lastEventId) {
+ new Thread() {
+ public void run() {
+ try {
+ final Integer id = Integer.valueOf(lastEventId);
+ final Builder builder = sse.newEventBuilder();
+
+ sink.send(createEvent(builder.name("book"), id + 1));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 2));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 3));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 4));
+ Thread.sleep(200);
+ sink.close();
+ } catch (final InterruptedException ex) {
+ LOG.error("Communication error", ex);
+ }
+ }
+ }.start();
+ }
+
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index a922d08..414d84f 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
@@ -92,7 +93,35 @@ public class BookStore2 extends BookStoreClientCloseable {
}
}.start();
}
+
+ @POST
+ @Path("sse/{id}")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ @Consumes(MediaType.TEXT_PLAIN)
+ public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
+ final String lastEventId) {
+ new Thread() {
+ public void run() {
+ try {
+ final Integer id = Integer.valueOf(lastEventId);
+ final Builder builder = sse.newEventBuilder();
+ sink.send(createEvent(builder.name("book"), id + 1));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 2));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 3));
+ Thread.sleep(200);
+ sink.send(createEvent(builder.name("book"), id + 4));
+ Thread.sleep(200);
+ sink.close();
+ } catch (final InterruptedException ex) {
+ LOG.error("Communication error", ex);
+ }
+ }
+ }.start();
+ }
+
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)