You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/02/01 18:50:19 UTC
[cxf] branch 3.3.x-fixes updated: CXF-8207: SseEventSource loses
lastEventId on the 2nd reconnect attempt
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
new b10905c CXF-8207: SseEventSource loses lastEventId on the 2nd reconnect attempt
b10905c is described below
commit b10905c1a58674e68dfb6c888a730cb7ba7b6e74
Author: reta <dr...@gmail.com>
AuthorDate: Sat Feb 1 13:31:02 2020 -0500
CXF-8207: SseEventSource loses lastEventId on the 2nd reconnect attempt
(cherry picked from commit df0e67ea2e3233a83c59cb5e9d613aee045de705)
---
.../cxf/jaxrs/sse/client/SseEventSourceImpl.java | 6 +-
.../jaxrs/sse/client/SseEventSourceImplTest.java | 104 +++++++++++++++++++--
2 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index f0c2acf..4e8ede2 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -63,6 +63,10 @@ public class SseEventSourceImpl implements SseEventSource {
private class InboundSseEventListenerDelegate implements InboundSseEventListener {
private String lastEventId;
+ InboundSseEventListenerDelegate(String lastEventId) {
+ this.lastEventId = lastEventId;
+ }
+
@Override
public void onNext(InboundSseEvent event) {
lastEventId = event.getId();
@@ -180,7 +184,7 @@ public class SseEventSourceImpl implements SseEventSource {
}
private void connect(String lastEventId) {
- final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate();
+ final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate(lastEventId);
Response response = null;
try {
diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 3926feb..93c6006 100644
--- a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -28,11 +28,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.ws.rs.BadRequestException;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -60,7 +64,7 @@ public class SseEventSourceImplTest {
enum Type {
NO_CONTENT, NO_SERVER, BUSY,
EVENT, EVENT_JUST_DATA, EVENT_JUST_NAME, EVENT_MULTILINE_DATA, EVENT_NO_RETRY, EVENT_BAD_RETRY, EVENT_MIXED,
- EVENT_BAD_NEW_LINES, EVENT_NOT_AUTHORIZED;
+ EVENT_BAD_NEW_LINES, EVENT_NOT_AUTHORIZED, EVENT_LAST_EVENT_ID, EVENT_RETRY_LAST_EVENT_ID;
}
private static final String EVENT = "event: event\n"
@@ -375,21 +379,69 @@ public class SseEventSourceImplTest {
}
}
+ @Test
+ public void testConnectWithLastEventId() throws InterruptedException, IOException {
+ try (SseEventSource eventSource = withNoReconnect(Type.EVENT_LAST_EVENT_ID, "10")) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(true));
+
+ // Allow the event processor to pull for events (150ms)
+ Thread.sleep(150L);
+ }
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getId(), equalTo("10"));
+ assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+ assertThat(events.get(0).getComment(), equalTo("test comment"));
+ assertThat(events.get(0).readData(), equalTo("test data"));
+ }
+
+ @Test
+ public void testReconnectWithLastEventId() throws InterruptedException, IOException {
+ try (SseEventSource eventSource = withReconnect(Type.EVENT_RETRY_LAST_EVENT_ID, "10")) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+ assertThat(errors.size(), equalTo(1));
+
+ // Allow the event processor to pull for events (150ms)
+ Thread.sleep(150L);
+ }
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getId(), equalTo("10"));
+ assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+ assertThat(events.get(0).getComment(), equalTo("test comment"));
+ assertThat(events.get(0).readData(), equalTo("test data"));
+ }
+
private SseEventSource withNoReconnect(Type type) {
- SseEventSource eventSource = SseEventSource.target(target(type)).build();
+ return withNoReconnect(type, null);
+ }
+
+ private SseEventSource withNoReconnect(Type type, String lastEventId) {
+ SseEventSource eventSource = SseEventSource.target(target(type, lastEventId)).build();
eventSource.register(events::add, errors::add);
return eventSource;
}
-
+
private SseEventSource withReconnect(Type type) {
- SseEventSource eventSource = SseEventSource.target(target(type)).reconnectingEvery(100L, TimeUnit.MILLISECONDS)
+ return withReconnect(type, null);
+ }
+
+ private SseEventSource withReconnect(Type type, String lastEventId) {
+ SseEventSource eventSource = SseEventSource.target(target(type, lastEventId))
+ .reconnectingEvery(100L, TimeUnit.MILLISECONDS)
.build();
eventSource.register(events::add, errors::add);
return eventSource;
}
- private static WebTarget target(Type type) {
- return ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+ private static WebTarget target(Type type, String lastEventId) {
+ final WebTarget target = ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+ if (lastEventId != null) {
+ target.property(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
+ }
+ return target;
}
@BeforeClass
@@ -408,6 +460,18 @@ public class SseEventSourceImplTest {
startServer(Type.EVENT_BAD_RETRY, EVENT_BAD_RETRY);
startServer(Type.EVENT_MIXED, EVENT_MIXED);
startServer(Type.EVENT_BAD_NEW_LINES, EVENT_BAD_NEW_LINES);
+
+ final Function<HttpHeaders, String> function = headers -> {
+ final String lastEventId = headers.getHeaderString(HttpHeaders.LAST_EVENT_ID_HEADER);
+ if (lastEventId != null) {
+ return EVENT.replaceAll("id: 1", "id: " + lastEventId);
+ } else {
+ return EVENT;
+ }
+ };
+
+ startDynamicServer(Type.EVENT_RETRY_LAST_EVENT_ID, function);
+ startDynamicServer(Type.EVENT_LAST_EVENT_ID, function);
}
private static void startNotAuthorizedServer(Type type) {
@@ -430,6 +494,13 @@ public class SseEventSourceImplTest {
sf.setServiceBean(new EventServer(payload));
SERVERS.put(type, sf.create());
}
+
+ private static void startDynamicServer(Type type, Function<HttpHeaders, String> function) {
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setAddress(LOCAL_ADDRESS + type.name());
+ sf.setServiceBean(new DynamicServer(function, type == Type.EVENT_RETRY_LAST_EVENT_ID));
+ SERVERS.put(type, sf.create());
+ }
@AfterClass
public static void stopServer() {
@@ -475,4 +546,25 @@ public class SseEventSourceImplTest {
}
}
+ public static class DynamicServer {
+ private final Function<HttpHeaders, String> function;
+ private volatile boolean fail = true;
+
+ public DynamicServer(Function<HttpHeaders, String> function, boolean fail) {
+ this.function = function;
+ this.fail = fail;
+ }
+
+ @GET
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public String event(@Context HttpHeaders headers) {
+ if (fail) {
+ fail = false;
+ throw new BadRequestException();
+ } else {
+ return function.apply(headers);
+ }
+ }
+ }
+
}