You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/06/28 22:53:20 UTC

[cxf] branch master updated: [CXF-8544] JAX-RS server-side SSE using POST (#812)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 422f5ba  [CXF-8544] JAX-RS server-side SSE using POST (#812)
422f5ba is described below

commit 422f5ba220c7516302342655d438e38107ea11e1
Author: Tim Ward <ti...@apache.org>
AuthorDate: Mon Jun 28 23:53:10 2021 +0100

    [CXF-8544] JAX-RS server-side SSE using POST (#812)
    
    Create a test to ensure SSE works for POST resource methods.
    N.B. This was *not* the case until CXF-8559 was fixed.
---
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java     | 71 ++++++++++++++++++++++
 .../apache/cxf/systest/jaxrs/sse/BookStore.java    | 29 +++++++++
 .../apache/cxf/systest/jaxrs/sse/BookStore2.java   | 29 +++++++++
 3 files changed, 129 insertions(+)

diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 8a82b21..e02a001 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.cxf.systest.jaxrs.sse;
 
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,16 +32,19 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.ext.MessageBodyReader;
 import javax.ws.rs.sse.InboundSseEvent;
 import javax.ws.rs.sse.SseEventSource;
 import javax.ws.rs.sse.SseEventSource.Builder;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +54,10 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public abstract class AbstractSseTest extends AbstractSseBaseTest {
     @Before
@@ -103,6 +115,65 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
         );
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testBooksStreamIsReturnedFromInboundSseEventsWithPOST() throws InterruptedException, IOException {
+        final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
+        final Collection<Book> books = new ArrayList<>();
+        
+        @SuppressWarnings("rawtypes")
+        MessageBodyReader mbr = new JacksonJsonProvider();
+        
+        Response response = target.request(MediaType.SERVER_SENT_EVENTS)
+            .post(Entity.entity(42, MediaType.TEXT_PLAIN));
+        
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class)))) {
+            String s;
+            Integer id = null;
+            Book book = null;
+            
+            while ((s = br.readLine()) != null) {
+                if (s.trim().isEmpty()) {
+                    if (id == null && book == null) {
+                        continue;
+                    } else if (id != null && book != null) {
+                        books.add(book);
+                        id = null;
+                        book = null;
+                        continue;
+                    }
+                    fail("The event did not contain both an id " + id + " and a book " + book);
+                }
+                if (s.startsWith("event:")) {
+                    assertEquals("Not a book event", "event: book", s.trim());
+                    continue;
+                }
+                if (s.startsWith("id:")) {
+                    assertNull("There was an existing id " + id, id);
+                    id = Integer.parseInt(s.substring(3).trim());
+                    continue;
+                }
+                if (s.startsWith("data:")) {
+                    assertNull("There was an existing book " + book, book);
+                    book = (Book) mbr.readFrom(Book.class, Book.class, null, MediaType.APPLICATION_JSON_TYPE, null, 
+                            new ByteArrayInputStream(s.substring(5).trim().getBytes(StandardCharsets.UTF_8)));
+                    continue;
+                }
+                fail("Unexpected String content returned by SSE POST " + s);
+            }
+        }
+    
+        // Easing the test verification here, it does not work well for Atm + Jetty
+        assertThat(books,
+                hasItems(
+                        new Book("New Book #43", 43),
+                        new Book("New Book #44", 44),
+                        new Book("New Book #45", 45),
+                        new Book("New Book #46", 46)
+                        )
+        );
+    }
+
     @Test
     public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 7fa54c5..c6561bd 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
@@ -94,6 +95,34 @@ public class BookStore extends BookStoreClientCloseable {
         }.start();
     }
     
+    @POST
+    @Path("sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    @Consumes(MediaType.TEXT_PLAIN)
+    public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
+            final String lastEventId) {
+        new Thread() {
+            public void run() {
+                try {
+                    final Integer id = Integer.valueOf(lastEventId);
+                    final Builder builder = sse.newEventBuilder();
+
+                    sink.send(createEvent(builder.name("book"), id + 1));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 2));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 3));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 4));
+                    Thread.sleep(200);
+                    sink.close();
+                } catch (final InterruptedException ex) {
+                    LOG.error("Communication error", ex);
+                }
+            }
+        }.start();
+    }
+    
     @GET
     @Path("nodelay/sse/{id}")
     @Produces(MediaType.SERVER_SENT_EVENTS)
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index a922d08..414d84f 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
@@ -92,7 +93,35 @@ public class BookStore2 extends BookStoreClientCloseable {
             }
         }.start();
     }
+    
+    @POST
+    @Path("sse/{id}")
+    @Produces(MediaType.SERVER_SENT_EVENTS)
+    @Consumes(MediaType.TEXT_PLAIN)
+    public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
+            final String lastEventId) {
+        new Thread() {
+            public void run() {
+                try {
+                    final Integer id = Integer.valueOf(lastEventId);
+                    final Builder builder = sse.newEventBuilder();
 
+                    sink.send(createEvent(builder.name("book"), id + 1));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 2));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 3));
+                    Thread.sleep(200);
+                    sink.send(createEvent(builder.name("book"), id + 4));
+                    Thread.sleep(200);
+                    sink.close();
+                } catch (final InterruptedException ex) {
+                    LOG.error("Communication error", ex);
+                }
+            }
+        }.start();
+    }
+    
     @GET
     @Path("nodelay/sse/{id}")
     @Produces(MediaType.SERVER_SENT_EVENTS)