You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/08/18 00:55:06 UTC

cxf git commit: CXF-7085: Introduce support for Server Sent Events (Client). Adding more test cases, many fixes in SSE protocol (reader / writer) implementation

Repository: cxf
Updated Branches:
  refs/heads/master 9890add2c -> 4cef686cb


CXF-7085: Introduce support for Server Sent Events (Client). Adding more test cases, many fixes in SSE protocol (reader / writer) implementation


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4cef686c
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4cef686c
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4cef686c

Branch: refs/heads/master
Commit: 4cef686cbbfcc8ec99042b525b44e5cc12647e1d
Parents: 9890add
Author: reta <dr...@gmail.com>
Authored: Thu Aug 17 20:54:47 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Thu Aug 17 20:54:47 2017 -0400

----------------------------------------------------------------------
 .../src/main/resources/web-ui/index.html        |   2 +-
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    |  16 +--
 .../src/main/resources/web-ui/index.html        |   2 +-
 .../jaxrs/sse/OutboundSseEventBodyWriter.java   |   2 +-
 .../jaxrs/sse/client/InboundSseEventImpl.java   |  10 +-
 .../sse/client/InboundSseEventProcessor.java    |  36 +++---
 .../sse/client/SseEventSourceImplTest.java      | 118 ++++++++++++++++++-
 7 files changed, 154 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
index d79f804..e7b4743 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
@@ -33,7 +33,7 @@
 	if( !!window.EventSource ) {
 	    var event = new EventSource("http://localhost:8686/rest/api/stats/sse");
 	
-	    event.addEventListener('message', function( event ) {	
+	    event.addEventListener('stats', function( event ) {	
 	    	var datapoint = jQuery.parseJSON( event.data );
 	    	 
 	    	chart.series[ 0 ].addPoint({

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 28730a3..0a15153 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -53,21 +53,21 @@ public class StatsRestServiceImpl {
             public void run() {
                 try {
                     final Builder builder = sse.newEventBuilder();
-                    sink.send(createStatsEvent(builder.name("stats"), 1));
+                    sink.send(createStatsEvent(builder, 1));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 2));
+                    sink.send(createStatsEvent(builder, 2));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 3));
+                    sink.send(createStatsEvent(builder, 3));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 4));
+                    sink.send(createStatsEvent(builder, 4));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 5));
+                    sink.send(createStatsEvent(builder, 5));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 6));
+                    sink.send(createStatsEvent(builder, 6));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 7));
+                    sink.send(createStatsEvent(builder, 7));
                     Thread.sleep(1000);
-                    sink.send(createStatsEvent(builder.name("stats"), 8));
+                    sink.send(createStatsEvent(builder, 8));
                     sink.close();
                 } catch (final Exception e) {
                     e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/resources/web-ui/index.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/resources/web-ui/index.html b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/resources/web-ui/index.html
index 2aaed6e..2b58b30 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/resources/web-ui/index.html
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/resources/web-ui/index.html
@@ -33,7 +33,7 @@
 	if( !!window.EventSource ) {
 	    var event = new EventSource("http://localhost:8686/rest/api/stats/sse/1");
 	
-	    event.addEventListener('message', function( event ) {	
+	    event.addEventListener('stats', function( event ) {	
 	    	var datapoint = jQuery.parseJSON( event.data );
 	    	 
 	    	chart.series[ 0 ].addPoint({

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
index 085df51..a97020f 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -43,7 +43,7 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse
     public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
 
     private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] EVENT = "    ".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] EVENT = "event: ".getBytes(StandardCharsets.UTF_8);
     private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
     private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8);
     private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8);

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
index f173576..a13b591 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
@@ -50,20 +50,24 @@ public final class InboundSseEventImpl implements InboundSseEvent {
     static class Builder {
         private static final Logger LOG = LogUtils.getL7dLogger(Builder.class);
 
-        private final String name;
+        private String name; /* the default event type would be "message" */
         private String id;
         private String comment;
         private OptionalLong reconnectDelay = OptionalLong.empty();
         private String data;
 
-        Builder(String name) {
-            this.name = name;
+        Builder() {
         }
 
         Builder id(String i) {
             this.id = i;
             return this;
         }
+        
+        Builder name(String n) {
+            this.name = n;
+            return this;
+        }
 
         Builder comment(String cmt) {
             this.comment = cmt;

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index 2c0b8b5..4d88ee7 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -35,6 +35,7 @@ import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.jaxrs.client.ClientProviderFactory;
 import org.apache.cxf.jaxrs.impl.ResponseImpl;
+import org.apache.cxf.jaxrs.sse.client.InboundSseEventImpl.Builder;
 import org.apache.cxf.message.Message;
 
 public class InboundSseEventProcessor {
@@ -42,7 +43,7 @@ public class InboundSseEventProcessor {
     public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
 
     private static final String COMMENT = ": ";
-    private static final String EVENT = "    ";
+    private static final String EVENT = "event: ";
     private static final String ID = "id: ";
     private static final String RETRY = "retry: ";
     private static final String DATA = "data: ";
@@ -82,23 +83,21 @@ public class InboundSseEventProcessor {
                 InboundSseEventImpl.Builder builder = null;
 
                 while (line != null && !Thread.interrupted() && !closed) {
-                    if (!StringUtils.isEmpty(line) && line.startsWith(EVENT)) {
-                        if (builder == null) {
-                            builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length()));
-                        } else {
-                            final InboundSseEvent event = builder.build(factory, message);
-                            builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length()));
-                            listener.onNext(event);
-                        }
-                    } else if (builder != null) {
-                        if (line.startsWith(ID)) {
-                            builder.id(line.substring(ID.length()));
+                    if (StringUtils.isEmpty(line) && builder != null) { /* empty new line */
+                        final InboundSseEvent event = builder.build(factory, message);
+                        builder = null; /* reset the builder for next event */
+                        listener.onNext(event);
+                    } else {
+                        if (line.startsWith(EVENT)) {
+                            builder = getOrCreate(builder).name(line.substring(EVENT.length()));
+                        } else if (line.startsWith(ID)) {
+                            builder = getOrCreate(builder).id(line.substring(ID.length()));
                         } else if (line.startsWith(COMMENT)) {
-                            builder.comment(line.substring(COMMENT.length()));
+                            builder = getOrCreate(builder).comment(line.substring(COMMENT.length()));
                         } else if (line.startsWith(RETRY)) {
-                            builder.reconnectDelay(line.substring(RETRY.length()));
+                            builder = getOrCreate(builder).reconnectDelay(line.substring(RETRY.length()));
                         } else if (line.startsWith(DATA)) {
-                            builder.data(line.substring(DATA.length()));
+                            builder = getOrCreate(builder).data(line.substring(DATA.length()));
                         }
                     }
                     line = reader.readLine();
@@ -140,4 +139,11 @@ public class InboundSseEventProcessor {
             return false;
         }
     }
+    
+    /**
+     * Create builder on-demand, without explicit event demarcation
+     */
+    private static Builder getOrCreate(final Builder builder) {
+        return (builder == null) ? new InboundSseEventImpl.Builder() : builder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/4cef686c/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 9d28972..9db391f 100644
--- a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -57,6 +57,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
@@ -65,26 +66,36 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SseEventSourceImplTest extends Assert {
-    private static final String EVENT = "    event\n"
+    private static final String EVENT = "event: event\n"
         + "id: 1\n"
         + "data: test data\n"
         + "retry: 10000\n"
         + ": test comment\n"
         + "\n";
     
-    private static final String EVENT_NO_RETRY = "    event\n"
+    private static final String EVENT_JUST_DATA = "\n"
+        + "data: just test data\n"
+        + "\n";
+    
+    private static final String EVENT_JUST_NAME = "\n"
+        + "event: just name\n";
+    
+    private static final String EVENT_NO_RETRY = "event: event\n"
         + "id: 1\n"
         + "data: test data\n"
         + ": test comment\n"
         + "\n";
     
-    private static final String EVENT_BAD_RETRY = "    event\n"
+    private static final String EVENT_BAD_RETRY = "event: event\n"
         + "id: 1\n"
         + "data: test data\n"
         + "retry: blba\n"
         + ": test comment\n"
         + "\n";
     
+    private static final String EVENT_MIXED = EVENT_JUST_DATA + EVENT;
+    private static final String EVENT_BAD_NEW_LINES =  "\n\n\n\n\n\n";
+    
     @Rule
     public ExpectedException exception = ExpectedException.none();
     
@@ -239,6 +250,104 @@ public class SseEventSourceImplTest extends Assert {
             assertThat(events.get(0).getId(), equalTo("1"));
             assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
             assertThat(events.get(0).getComment(), equalTo("test comment"));
+            assertThat(events.get(0).readData(), equalTo("test data"));
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndJustDataEventIsReceived() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT_JUST_DATA.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(1));
+            assertThat(events.get(0).getName(), nullValue());
+            assertThat(events.get(0).readData(), equalTo("just test data"));
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndJustEventNameIsReceived() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT_JUST_NAME.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(1));
+            assertThat(events.get(0).getName(), equalTo("just name"));
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndMixedEventsAreReceived() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT_MIXED.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(2));
+            assertThat(events.get(0).getName(), nullValue());
+            assertThat(events.get(0).readData(), equalTo("just test data"));
+            assertThat(events.get(1).getId(), equalTo("1"));
+            assertThat(events.get(1).getReconnectDelay(), equalTo(10000L));
+            assertThat(events.get(1).getComment(), equalTo("test comment"));
+            assertThat(events.get(1).readData(), equalTo("test data"));
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndNoEventsAreDetected() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT_BAD_NEW_LINES.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(0));
         }
     }
     
@@ -269,8 +378,10 @@ public class SseEventSourceImplTest extends Assert {
             assertThat(events.size(), equalTo(2));
             assertThat(events.get(0).getId(), equalTo("1"));
             assertThat(events.get(0).getComment(), equalTo("test comment"));
+            assertThat(events.get(0).readData(), equalTo("test data"));
             assertThat(events.get(1).getId(), equalTo("1"));
             assertThat(events.get(1).getComment(), equalTo("test comment"));
+            assertThat(events.get(1).readData(), equalTo("test data"));
         } finally {
             for (final InputStream is: closeables) {
                 is.close();
@@ -323,6 +434,7 @@ public class SseEventSourceImplTest extends Assert {
             assertThat(events.get(0).getId(), equalTo("1"));
             assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
             assertThat(events.get(0).getComment(), equalTo("test comment"));
+            assertThat(events.get(0).readData(), equalTo("test data"));
         }
     }