You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by bu...@apache.org on 2019/05/29 11:41:48 UTC

[cxf] branch master updated: cxf-rt-rs-sse: SseEventSourceImplTest on server instead of mock

This is an automated email from the ASF dual-hosted git repository.

buhhunyx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 6488c5f  cxf-rt-rs-sse: SseEventSourceImplTest on server instead of mock
6488c5f is described below

commit 6488c5f422c8df8808f629efd44d19b6f6c7e48d
Author: Alexey Markevich <bu...@gmail.com>
AuthorDate: Wed May 29 14:40:53 2019 +0300

    cxf-rt-rs-sse: SseEventSourceImplTest on server instead of mock
---
 .../jaxrs/sse/client/SseEventSourceImplTest.java   | 453 +++++++++------------
 1 file changed, 195 insertions(+), 258 deletions(-)

diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
index 0b7f1c0..1c0ae15 100644
--- a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -18,54 +18,48 @@
  */
 package org.apache.cxf.jaxrs.sse.client;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.Configuration;
+import javax.ws.rs.GET;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import javax.ws.rs.sse.InboundSseEvent;
 import javax.ws.rs.sse.SseEventSource;
 
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.jaxrs.client.ClientConfiguration;
-import org.apache.cxf.jaxrs.client.ClientProviderFactory;
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.apache.cxf.jaxrs.client.spec.ClientImpl.WebTargetImpl;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 
 import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
+
 public class SseEventSourceImplTest {
+
+    enum Type {
+        NO_CONTENT, NO_SERVER, BUSY,
+        EVENT, EVENT_JUST_DATA, EVENT_JUST_NAME, EVENT_NO_RETRY, EVENT_BAD_RETRY, EVENT_MIXED, EVENT_BAD_NEW_LINES;
+    }
+
     private static final String EVENT = "event: event\n"
         + "id: 1\n"
         + "data: test data\n"
@@ -96,124 +90,75 @@ public class SseEventSourceImplTest {
     private static final String EVENT_MIXED = EVENT_JUST_DATA + EVENT;
     private static final String EVENT_BAD_NEW_LINES =  "\n\n\n\n\n\n";
 
+    private static final String LOCAL_ADDRESS = "local://";
+
+    private static final Map<Type, Server> SERVERS = new EnumMap<>(Type.class);
+
     @Rule
     public ExpectedException exception = ExpectedException.none();
 
-    private final ExecutorService executor = Executors.newSingleThreadExecutor();
-    private final ClientProviderFactory clientProviderFactory = ClientProviderFactory.createInstance(null);
-
-    /**
-     * Subclass the WebClient to augment the visibility of getConfiguration() method.
-     */
-    private static class TestWebClient extends WebClient {
-        TestWebClient(URI baseURI) {
-            super(baseURI);
-        }
-
-        @Override
-        public ClientConfiguration getConfiguration() {
-            return super.getConfiguration();
-        }
-    }
+    private final List<InboundSseEvent> events = new ArrayList<>();
+    private final List<Throwable> errors = new ArrayList<>();
 
-    @Mock
-    private TestWebClient client;
-    @Mock
-    private ClientConfiguration clientConfiguration;
-    @Mock
-    private WebTargetImpl target;
-    @Mock
-    private Configuration configuration;
-    @Mock
-    private Invocation.Builder builder;
-    @Mock
-    private Endpoint endpoint;
-    @Mock
-    private Response response;
-
-    @Before
-    public void setUp() {
-        when(target.getConfiguration()).thenReturn(configuration);
-        when(target.getWebClient()).thenReturn(client);
-        when(target.request(MediaType.SERVER_SENT_EVENTS)).thenReturn(builder);
-        when(builder.header(any(String.class), any(Object.class))).thenReturn(builder);
-        when(builder.get()).thenReturn(response);
-        when(client.getConfiguration()).thenReturn(clientConfiguration);
-        when(clientConfiguration.getEndpoint()).thenReturn(endpoint);
-        when(endpoint.get("org.apache.cxf.jaxrs.client.ClientProviderFactory")).thenReturn(clientProviderFactory);
-    }
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
     @After
     public void tearDown() throws InterruptedException {
         executor.shutdown();
-        executor.awaitTermination(1, TimeUnit.SECONDS);
+        executor.awaitTermination(1L, TimeUnit.SECONDS);
     }
 
     @Test
     public void testNoReconnectWhenNoContentIsReturned() {
-        // Verify that 204 response code won't force reconnect
-        when(response.getStatus()).thenReturn(204);
-
-        try (SseEventSource eventSource = withNoReconnect()) {
+        try (SseEventSource eventSource = withNoReconnect(Type.NO_CONTENT)) {
             eventSource.open();
             assertThat(eventSource.isOpen(), equalTo(false));
-            verify(response, times(1)).getStatus();
+
+            assertThat(events.size(), equalTo(0));
         }
     }
 
     @Test
     public void testReuseSameEventSourceSeveralTimes() {
-        // Verify that 204 response code won't force reconnect
-        when(response.getStatus()).thenReturn(204);
-
-        try (SseEventSource eventSource = withNoReconnect()) {
+        try (SseEventSource eventSource = withNoReconnect(Type.NO_CONTENT)) {
             eventSource.open();
             assertThat(eventSource.isOpen(), equalTo(false));
-            verify(response, times(1)).getStatus();
 
             eventSource.open();
             assertThat(eventSource.isOpen(), equalTo(false));
-            verify(response, times(2)).getStatus();
+
+            assertThat(events.size(), equalTo(0));
         }
     }
 
     @Test
     public void testReconnectWillBeScheduledOnError() throws InterruptedException {
-        when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
-
-        try (SseEventSource eventSource = withReconnect()) {
+        try (SseEventSource eventSource = withReconnect(Type.NO_SERVER)) {
             eventSource.open();
             assertThat(eventSource.isOpen(), equalTo(false));
 
             // Sleep a little bit for reconnect to reschedule
-            Thread.sleep(150);
-            verify(builder, atLeast(2)).get();
+            Thread.sleep(150L);
+            assertThat(errors.size(), equalTo(2));
         }
     }
 
     @Test
     public void testNoReconnectWillBeScheduledWhenClosed() throws InterruptedException {
-        when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
-
-        try (SseEventSource eventSource = withReconnect()) {
+        try (SseEventSource eventSource = withReconnect(Type.NO_SERVER)) {
             eventSource.open();
             assertThat(eventSource.isOpen(), equalTo(false));
-            eventSource.close(1, TimeUnit.SECONDS);
+            eventSource.close(1L, TimeUnit.SECONDS);
 
             // Sleep a little bit to make sure for reconnect to reschedule (after 100ms)
-            Thread.sleep(150);
-            verify(builder, times(1)).get();
+            Thread.sleep(150L);
+            assertThat(errors.size(), equalTo(1));
         }
     }
 
     @Test
     public void testWhenTryToConnectTwiceSecondAttemtShouldFail() throws InterruptedException, ExecutionException {
-        when(builder.get()).then(invocation -> {
-            Thread.sleep(100);
-            return response;
-        });
-
-        try (SseEventSource eventSource = withReconnect()) {
+        try (SseEventSource eventSource = withReconnect(Type.BUSY)) {
             eventSource.open();
 
             // The attempt to open the SSE connection in another thread at the same
@@ -223,233 +168,152 @@ public class SseEventSourceImplTest {
             assertThat(future.get(), equalTo(null));
 
             assertThat(eventSource.isOpen(), equalTo(true));
-            verify(builder, times(1)).get();
+            assertThat(events.size(), equalTo(1));
         }
     }
 
     @Test
     public void testNoReconnectAndOneEventReceived() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(1));
-            assertThat(events.get(0).getId(), equalTo("1"));
-            assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
-            assertThat(events.get(0).getComment(), equalTo("test comment"));
-            assertThat(events.get(0).readData(), equalTo("test data"));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getId(), equalTo("1"));
+        assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+        assertThat(events.get(0).getComment(), equalTo("test comment"));
+        assertThat(events.get(0).readData(), equalTo("test data"));
     }
 
     @Test
     public void testNoReconnectAndJustDataEventIsReceived() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT_JUST_DATA.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_JUST_DATA)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(1));
-            assertThat(events.get(0).getName(), nullValue());
-            assertThat(events.get(0).readData(), equalTo("just test data"));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getName(), nullValue());
+        assertThat(events.get(0).readData(), equalTo("just test data"));
     }
 
     @Test
     public void testNoReconnectAndJustEventNameIsReceived() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT_JUST_NAME.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_JUST_NAME)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(1));
-            assertThat(events.get(0).getName(), equalTo("just name"));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getName(), equalTo("just name"));
     }
 
     @Test
     public void testNoReconnectAndMixedEventsAreReceived() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT_MIXED.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_MIXED)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(2));
-            assertThat(events.get(0).getName(), nullValue());
-            assertThat(events.get(0).readData(), equalTo("just test data"));
-            assertThat(events.get(1).getId(), equalTo("1"));
-            assertThat(events.get(1).getReconnectDelay(), equalTo(10000L));
-            assertThat(events.get(1).getComment(), equalTo("test comment"));
-            assertThat(events.get(1).readData(), equalTo("test data"));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(2));
+        assertThat(events.get(0).getName(), nullValue());
+        assertThat(events.get(0).readData(), equalTo("just test data"));
+        assertThat(events.get(1).getId(), equalTo("1"));
+        assertThat(events.get(1).getReconnectDelay(), equalTo(10000L));
+        assertThat(events.get(1).getComment(), equalTo("test comment"));
+        assertThat(events.get(1).readData(), equalTo("test data"));
     }
 
     @Test
     public void testNoReconnectAndNoEventsAreDetected() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT_BAD_NEW_LINES.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_BAD_NEW_LINES)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(0));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(0));
     }
 
     @Test
     public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOException {
-        final Collection<InputStream> closeables = new ArrayList<>();
-
-        try {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).then(Invocation -> {
-                final InputStream is = new ByteArrayInputStream(EVENT_NO_RETRY.getBytes(StandardCharsets.UTF_8));
-                closeables.add(is);
-                return is;
-            });
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                //verify(response, times(1)).getStatus();
-
-                // Allow the event processor to pull for events (200ms)
-                Thread.sleep(150);
-            }
+        try (SseEventSource eventSource = withReconnect(Type.EVENT_NO_RETRY)) {
+            eventSource.open();
 
-            assertThat(events.size(), equalTo(2));
-            assertThat(events.get(0).getId(), equalTo("1"));
-            assertThat(events.get(0).getComment(), equalTo("test comment"));
-            assertThat(events.get(0).readData(), equalTo("test data"));
-            assertThat(events.get(1).getId(), equalTo("1"));
-            assertThat(events.get(1).getComment(), equalTo("test comment"));
-            assertThat(events.get(1).readData(), equalTo("test data"));
-        } finally {
-            for (final InputStream is: closeables) {
-                is.close();
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
+
+            // Allow the event processor to pull for events (200ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(2));
+        assertThat(events.get(0).getId(), equalTo("1"));
+        assertThat(events.get(0).getComment(), equalTo("test comment"));
+        assertThat(events.get(0).readData(), equalTo("test data"));
+        assertThat(events.get(1).getId(), equalTo("1"));
+        assertThat(events.get(1).getComment(), equalTo("test comment"));
+        assertThat(events.get(1).readData(), equalTo("test data"));
     }
 
     @Test
     public void testNoReconnectAndCloseTheStreamWhileEventIsBeingReceived() throws InterruptedException, IOException {
-        when(response.getStatus()).thenReturn(200);
-        when(response.readEntity(InputStream.class)).then(invocation -> {
-            Thread.sleep(200);
-            return null;
-        });
-
-        final List<InboundSseEvent> events = new ArrayList<>();
-        try (SseEventSource eventSource = withNoReconnect()) {
-            eventSource.register(events::add);
+        try (SseEventSource eventSource = withNoReconnect(Type.BUSY)) {
             eventSource.open();
 
             assertThat(eventSource.isOpen(), equalTo(true));
-            verify(response, times(1)).getStatus();
 
             // Allow the event processor to pull for events (200ms)
-            Thread.sleep(50);
-            assertThat(eventSource.close(100, TimeUnit.MILLISECONDS), equalTo(true));
+            Thread.sleep(50L);
+            assertThat(eventSource.close(100L, TimeUnit.MILLISECONDS), equalTo(true));
             assertThat(eventSource.isOpen(), equalTo(false));
         }
     }
 
     @Test
     public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, IOException {
-        try (InputStream is = new ByteArrayInputStream(EVENT_BAD_RETRY.getBytes(StandardCharsets.UTF_8))) {
-            when(response.getStatus()).thenReturn(200);
-            when(response.readEntity(InputStream.class)).thenReturn(is);
-
-            final List<InboundSseEvent> events = new ArrayList<>();
-            try (SseEventSource eventSource = withNoReconnect()) {
-                eventSource.register(events::add);
-                eventSource.open();
-
-                assertThat(eventSource.isOpen(), equalTo(true));
-                verify(response, times(1)).getStatus();
+        try (SseEventSource eventSource = withNoReconnect(Type.EVENT_BAD_RETRY)) {
+            eventSource.open();
 
-                // Allow the event processor to pull for events (150ms)
-                Thread.sleep(150);
-            }
+            assertThat(eventSource.isOpen(), equalTo(true));
 
-            assertThat(events.size(), equalTo(1));
-            assertThat(events.get(0).getId(), equalTo("1"));
-            assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
-            assertThat(events.get(0).getComment(), equalTo("test comment"));
-            assertThat(events.get(0).readData(), equalTo("test data"));
+            // Allow the event processor to pull for events (150ms)
+            Thread.sleep(150L);
         }
+
+        assertThat(events.size(), equalTo(1));
+        assertThat(events.get(0).getId(), equalTo("1"));
+        assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
+        assertThat(events.get(0).getComment(), equalTo("test comment"));
+        assertThat(events.get(0).readData(), equalTo("test data"));
     }
 
     @Test
     public void testTryToCloseWhileConnecting() throws ExecutionException, InterruptedException {
-        when(response.getStatus()).thenReturn(200);
-        when(builder.get()).then(invocation -> {
-            Thread.sleep(200);
-            return response;
-        });
-
-        try (SseEventSource eventSource = withNoReconnect()) {
+        try (SseEventSource eventSource = withNoReconnect(Type.BUSY)) {
             final Future<?> future = executor.submit(() -> eventSource.open());
 
             // Wait a bit for open() to advance
-            Thread.sleep(100);
+            Thread.sleep(50L);
             eventSource.close();
 
             assertThat(future.get(), equalTo(null));
@@ -457,11 +321,84 @@ public class SseEventSourceImplTest {
         }
     }
 
-    private SseEventSource withNoReconnect() {
-        return SseEventSource.target(target).build();
+    private SseEventSource withNoReconnect(Type type) {
+        SseEventSource eventSource = SseEventSource.target(target(type)).build();
+        eventSource.register(events::add, errors::add);
+        return eventSource;
+    }
+
+    private SseEventSource withReconnect(Type type) {
+        SseEventSource eventSource = SseEventSource.target(target(type)).reconnectingEvery(100L, TimeUnit.MILLISECONDS)
+                .build();
+        eventSource.register(events::add, errors::add);
+        return eventSource;
+    }
+
+    private static WebTarget target(Type type) {
+        return ClientBuilder.newClient().target(LOCAL_ADDRESS + type.name());
+    }
+
+    @BeforeClass
+    public static void startServer() {
+        startServer(Type.NO_CONTENT, null);
+        // Type.NO_SERVER
+
+        Type type = Type.BUSY;
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setAddress(LOCAL_ADDRESS + type.name());
+        sf.setServiceBean(new BusyEventServer());
+        SERVERS.put(type, sf.create());
+
+        startServer(Type.EVENT, EVENT);
+        startServer(Type.EVENT_JUST_DATA, EVENT_JUST_DATA);
+        startServer(Type.EVENT_JUST_NAME, EVENT_JUST_NAME);
+        startServer(Type.EVENT_NO_RETRY, EVENT_NO_RETRY);
+        startServer(Type.EVENT_BAD_RETRY, EVENT_BAD_RETRY);
+        startServer(Type.EVENT_MIXED, EVENT_MIXED);
+        startServer(Type.EVENT_BAD_NEW_LINES, EVENT_BAD_NEW_LINES);
+    }
+
+    private static void startServer(Type type, String payload) {
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setAddress(LOCAL_ADDRESS + type.name());
+        sf.setServiceBean(new EventServer(payload));
+        SERVERS.put(type, sf.create());
     }
 
-    private SseEventSource withReconnect() {
-        return SseEventSource.target(target).reconnectingEvery(100, TimeUnit.MILLISECONDS).build();
+    @AfterClass
+    public static void stopServer() {
+        for (Server server : SERVERS.values()) {
+            server.stop();
+            server.destroy();
+        }
     }
+
+    public static class EventServer {
+        private final String payload;
+
+        public EventServer(String event) {
+            payload = event;
+        }
+
+        @GET
+        @Produces(MediaType.SERVER_SENT_EVENTS)
+        public String event() {
+            return payload;
+        }
+    }
+
+    public static class BusyEventServer extends EventServer {
+        public BusyEventServer() {
+            super(EVENT);
+        }
+        @Override
+        public String event() {
+            try {
+                Thread.sleep(100L);
+            } catch (InterruptedException e) {
+            }
+            return super.event();
+        }
+    }
+
 }