You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/02/01 18:50:19 UTC

[cxf] branch 3.3.x-fixes updated: CXF-8207: SseEventSource loses lastEventId on the 2nd reconnect attempt

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
     new b10905c  CXF-8207: SseEventSource loses lastEventId on the 2nd reconnect attempt
b10905c is described below

commit b10905c1a58674e68dfb6c888a730cb7ba7b6e74
Author: reta <dr...@gmail.com>
AuthorDate: Sat Feb 1 13:31:02 2020 -0500

    CXF-8207: SseEventSource loses lastEventId on the 2nd reconnect attempt
    
    (cherry picked from commit df0e67ea2e3233a83c59cb5e9d613aee045de705)
---
 .../cxf/jaxrs/sse/client/SseEventSourceImpl.java   |   6 +-
 .../jaxrs/sse/client/SseEventSourceImplTest.java   | 104 +++++++++++++++++++--
 2 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index f0c2acf..4e8ede2 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -63,6 +63,10 @@ public class SseEventSourceImpl implements SseEventSource {
     private class InboundSseEventListenerDelegate implements InboundSseEventListener {
         private String lastEventId;
         
+        InboundSseEventListenerDelegate(String lastEventId) {
+            this.lastEventId = lastEventId;
+        }
+        
         @Override
         public void onNext(InboundSseEvent event) {
             lastEventId = event.getId();
@@ -180,7 +184,7 @@ public class SseEventSourceImpl implements SseEventSource {
     }
 
     private void connect(String lastEventId) {
-        final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate();
+        final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate(lastEventId);
         Response response = null;
         
         try {
diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 3926feb..93c6006 100644
--- a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -28,11 +28,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
+import javax.ws.rs.BadRequestException;
 import javax.ws.rs.GET;
 import javax.ws.rs.Produces;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -60,7 +64,7 @@ public class SseEventSourceImplTest {
     enum Type {
         NO_CONTENT, NO_SERVER, BUSY,
         EVENT, EVENT_JUST_DATA, EVENT_JUST_NAME, EVENT_MULTILINE_DATA, EVENT_NO_RETRY, EVENT_BAD_RETRY, EVENT_MIXED,
-        EVENT_BAD_NEW_LINES, EVENT_NOT_AUTHORIZED;
+        EVENT_BAD_NEW_LINES, EVENT_NOT_AUTHORIZED, EVENT_LAST_EVENT_ID, EVENT_RETRY_LAST_EVENT_ID;
     }
 
     private static final String EVENT = "event: event\n"
@@ -375,21 +379,69 @@ public class SseEventSourceImplTest {
         }
     }
 
+    @Test
+    public void testConnectWithLastEventId() throws InterruptedException, IOException {
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_LAST_EVENT_ID, "10")) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(true));
+
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
+        }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getId(), equalTo("10"));
+        assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+        assertThat(events.get(0).getComment(), equalTo("test comment"));
+        assertThat(events.get(0).readData(), equalTo("test data"));
+    }
+    
+    @Test
+    public void testReconnectWithLastEventId() throws InterruptedException, IOException {
+        try (SseEventSource eventSource = withReconnect(Type.EVENT_RETRY_LAST_EVENT_ID, "10")) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            assertThat(errors.size(), equalTo(1));
+
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
+        }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getId(), equalTo("10"));
+        assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+        assertThat(events.get(0).getComment(), equalTo("test comment"));
+        assertThat(events.get(0).readData(), equalTo("test data"));
+    }
+    
     private SseEventSource withNoReconnect(Type type) {
-        SseEventSource eventSource = SseEventSource.target(target(type)).build();
+        return withNoReconnect(type, null);
+    }
+    
+    private SseEventSource withNoReconnect(Type type, String lastEventId) {
+        SseEventSource eventSource = SseEventSource.target(target(type, lastEventId)).build();
         eventSource.register(events::add, errors::add);
         return eventSource;
     }
-
+    
     private SseEventSource withReconnect(Type type) {
-        SseEventSource eventSource = SseEventSource.target(target(type)).reconnectingEvery(100L, TimeUnit.MILLISECONDS)
+        return withReconnect(type, null);
+    }
+
+    private SseEventSource withReconnect(Type type, String lastEventId) {
+        SseEventSource eventSource = SseEventSource.target(target(type, lastEventId))
+                .reconnectingEvery(100L, TimeUnit.MILLISECONDS)
                 .build();
         eventSource.register(events::add, errors::add);
         return eventSource;
     }
 
-    private static WebTarget target(Type type) {
-        return ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+    private static WebTarget target(Type type, String lastEventId) {
+        final WebTarget target = ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+        if (lastEventId != null) {
+            target.property(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
+        }
+        return target;
     }
 
     @BeforeClass
@@ -408,6 +460,18 @@ public class SseEventSourceImplTest {
         startServer(Type.EVENT_BAD_RETRY, EVENT_BAD_RETRY);
         startServer(Type.EVENT_MIXED, EVENT_MIXED);
         startServer(Type.EVENT_BAD_NEW_LINES, EVENT_BAD_NEW_LINES);
+        
+        final Function<HttpHeaders, String> function = headers -> {
+            final String lastEventId = headers.getHeaderString(HttpHeaders.LAST_EVENT_ID_HEADER);
+            if (lastEventId != null) {
+                return EVENT.replaceAll("id: 1", "id: " + lastEventId); 
+            } else {
+                return EVENT;
+            }
+        };
+        
+        startDynamicServer(Type.EVENT_RETRY_LAST_EVENT_ID, function);
+        startDynamicServer(Type.EVENT_LAST_EVENT_ID, function);
     }
 
     private static void startNotAuthorizedServer(Type type) {
@@ -430,6 +494,13 @@ public class SseEventSourceImplTest {
         sf.setServiceBean(new EventServer(payload));
         SERVERS.put(type, sf.create());
     }
+    
+    private static void startDynamicServer(Type type, Function<HttpHeaders, String> function) {
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setAddress(LOCAL_ADDRESS + type.name());
+        sf.setServiceBean(new DynamicServer(function, type == Type.EVENT_RETRY_LAST_EVENT_ID));
+        SERVERS.put(type, sf.create());
+    }
 
     @AfterClass
     public static void stopServer() {
@@ -475,4 +546,25 @@ public class SseEventSourceImplTest {
         }
     }
 
+    public static class DynamicServer {
+        private final Function<HttpHeaders, String> function;
+        private volatile boolean fail = true; 
+
+        public DynamicServer(Function<HttpHeaders, String> function, boolean fail) {
+            this.function = function;
+            this.fail = fail;
+        }
+
+        @GET
+        @Produces(MediaType.SERVER_SENT_EVENTS)
+        public String event(@Context HttpHeaders headers) {
+            if (fail) {
+                fail = false;
+                throw new BadRequestException();
+            } else {
+                return function.apply(headers);
+            }
+        }
+    }
+
 }