You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/08/15 15:31:37 UTC
cxf git commit: CXF-7085: Introduce support for Server Sent Events
(Client). Adding more test cases, multiple bugfixes and improvements
Repository: cxf
Updated Branches:
refs/heads/master 262ebe6bd -> f84cad4f5
CXF-7085: Introduce support for Server Sent Events (Client). Adding more test cases, multiple bugfixes and improvements
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/f84cad4f
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/f84cad4f
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/f84cad4f
Branch: refs/heads/master
Commit: f84cad4f52af9df97ab40605d0c3a1a54c8ce083
Parents: 262ebe6
Author: reta <dr...@gmail.com>
Authored: Tue Aug 15 11:31:03 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Tue Aug 15 11:31:03 2017 -0400
----------------------------------------------------------------------
rt/rs/sse/pom.xml | 6 +
.../jaxrs/sse/client/InboundSseEventImpl.java | 6 +-
.../sse/client/InboundSseEventProcessor.java | 4 +-
.../jaxrs/sse/client/SseEventSourceImpl.java | 25 +-
.../sse/client/SseEventSourceImplTest.java | 356 +++++++++++++++++++
5 files changed, 386 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index c53d504..97aaf83 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -67,5 +67,11 @@
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.7.14</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
index 8a9d8aa..f173576 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
@@ -37,7 +37,7 @@ import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.jaxrs.client.ClientProviderFactory;
import org.apache.cxf.message.Message;
-public class InboundSseEventImpl implements InboundSseEvent {
+public final class InboundSseEventImpl implements InboundSseEvent {
private final String id;
private final String name;
private final String comment;
@@ -142,13 +142,13 @@ public class InboundSseEventImpl implements InboundSseEvent {
@Override
public <T> T readData(Class<T> type) {
- return read(type, type, MediaType.WILDCARD_TYPE);
+ return read(type, type, MediaType.TEXT_PLAIN_TYPE);
}
@Override
@SuppressWarnings("unchecked")
public <T> T readData(GenericType<T> type) {
- return read((Class<T>)type.getRawType(), type.getType(), MediaType.WILDCARD_TYPE);
+ return read((Class<T>)type.getRawType(), type.getType(), MediaType.TEXT_PLAIN_TYPE);
}
@Override
http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index d105963..2c0b8b5 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -56,7 +56,7 @@ public class InboundSseEventProcessor {
protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener) {
this.endpoint = endpoint;
this.listener = listener;
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newSingleThreadScheduledExecutor();
}
void run(final Response response) {
@@ -94,7 +94,7 @@ public class InboundSseEventProcessor {
if (line.startsWith(ID)) {
builder.id(line.substring(ID.length()));
} else if (line.startsWith(COMMENT)) {
- builder.id(line.substring(COMMENT.length()));
+ builder.comment(line.substring(COMMENT.length()));
} else if (line.startsWith(RETRY)) {
builder.reconnectDelay(line.substring(RETRY.length()));
} else if (line.startsWith(DATA)) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index d8b22d4..5da7a5b 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -196,11 +196,19 @@ public class SseEventSourceImpl implements SseEventSource {
// response code. In this case, we should give up.
if (response.getStatus() == 204) {
LOG.fine("SSE endpoint " + target.getUri() + " returns no data, disconnecting");
- state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED);
+ state.set(SseSourceState.CLOSED);
response.close();
return;
}
+ // Should not happen but if close() was called from another thread, we could
+ // end up there.
+ if (state.get() == SseSourceState.CLOSED) {
+ LOG.fine("SSE connection to " + target.getUri() + " has been closed already");
+ response.close();
+ return;
+ }
+
final Endpoint endpoint = WebClient.getConfig(target).getEndpoint();
// Create new processor if this is the first time or the old one has been closed
if (processor == null || processor.isClosed()) {
@@ -212,7 +220,10 @@ public class SseEventSourceImpl implements SseEventSource {
processor.run(response);
LOG.fine("SSE event processor has been started ...");
- state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN);
+ if (!state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN)) {
+ throw new IllegalStateException("The SseEventSource is already in " + state.get() + " state");
+ }
+
LOG.fine("Successfuly opened SSE connection to " + target.getUri());
} catch (final Exception ex) {
if (processor != null) {
@@ -273,10 +284,12 @@ public class SseEventSourceImpl implements SseEventSource {
}
// If the connection was still on connecting state, just try to reconnect
- if (state.get() != SseSourceState.CONNECTING
- && !state.compareAndSet(SseSourceState.OPEN, SseSourceState.CONNECTING)) {
- throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get()
- + " state, unable to reconnect");
+ if (state.get() != SseSourceState.CONNECTING) {
+ LOG.fine("The SseEventSource is still opened, moving it to connecting state");
+ if (!state.compareAndSet(SseSourceState.OPEN, SseSourceState.CONNECTING)) {
+ throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get()
+ + " state, unable to reconnect");
+ }
}
executor.schedule(() -> {
http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
new file mode 100644
index 0000000..9d28972
--- /dev/null
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.Configuration;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.jaxrs.client.ClientConfiguration;
+import org.apache.cxf.jaxrs.client.ClientProviderFactory;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.client.spec.ClientImpl.WebTargetImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SseEventSourceImplTest extends Assert {
+ private static final String EVENT = " event\n"
+ + "id: 1\n"
+ + "data: test data\n"
+ + "retry: 10000\n"
+ + ": test comment\n"
+ + "\n";
+
+ private static final String EVENT_NO_RETRY = " event\n"
+ + "id: 1\n"
+ + "data: test data\n"
+ + ": test comment\n"
+ + "\n";
+
+ private static final String EVENT_BAD_RETRY = " event\n"
+ + "id: 1\n"
+ + "data: test data\n"
+ + "retry: blba\n"
+ + ": test comment\n"
+ + "\n";
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ClientProviderFactory clientProviderFactory = ClientProviderFactory.createInstance(null);
+
+ /**
+ * Subclass the WebClient to augment the visibility of getConfiguration() method.
+ */
+ private static class TestWebClient extends WebClient {
+ TestWebClient(URI baseURI) {
+ super(baseURI);
+ }
+
+ @Override
+ public ClientConfiguration getConfiguration() {
+ return super.getConfiguration();
+ }
+ }
+
+ @Mock
+ private TestWebClient client;
+ @Mock
+ private ClientConfiguration clientConfiguration;
+ @Mock
+ private WebTargetImpl target;
+ @Mock
+ private Configuration configuration;
+ @Mock
+ private Invocation.Builder builder;
+ @Mock
+ private Endpoint endpoint;
+ @Mock
+ private Response response;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() {
+ when(target.getConfiguration()).thenReturn(configuration);
+ when(target.getWebClient()).thenReturn(client);
+ when(target.request(MediaType.SERVER_SENT_EVENTS)).thenReturn(builder);
+ when(builder.headers(any(MultivaluedMap.class))).thenReturn(builder);
+ when(builder.get()).thenReturn(response);
+ when(client.getConfiguration()).thenReturn(clientConfiguration);
+ when(clientConfiguration.getEndpoint()).thenReturn(endpoint);
+ when(endpoint.get("org.apache.cxf.jaxrs.client.ClientProviderFactory")).thenReturn(clientProviderFactory);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testNoReconnectWhenNoContentIsReturned() {
+ // Verify that 204 response code won't force reconnect
+ when(response.getStatus()).thenReturn(204);
+
+ try (SseEventSource eventSource = withNoReconnect()) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+ verify(response, times(1)).getStatus();
+ }
+ }
+
+ @Test
+ public void testReuseSameEventSourceSeveralTimes() {
+ // Verify that 204 response code won't force reconnect
+ when(response.getStatus()).thenReturn(204);
+
+ try (SseEventSource eventSource = withNoReconnect()) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+ verify(response, times(1)).getStatus();
+
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+ verify(response, times(2)).getStatus();
+ }
+ }
+
+ @Test
+ public void testReconnectWillBeScheduledOnError() throws InterruptedException {
+ when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
+
+ try (SseEventSource eventSource = withReconnect()) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+
+ // Sleep a little bit for reconnect to reschedule
+ Thread.sleep(150);
+ verify(builder, atLeast(2)).get();
+ }
+ }
+
+ @Test
+ public void testNoReconnectWillBeScheduledWhenClosed() throws InterruptedException {
+ when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
+
+ try (SseEventSource eventSource = withReconnect()) {
+ eventSource.open();
+ assertThat(eventSource.isOpen(), equalTo(false));
+ eventSource.close(1, TimeUnit.SECONDS);
+
+ // Sleep a little bit to make sure for reconnect to reschedule (after 100ms)
+ Thread.sleep(150);
+ verify(builder, times(1)).get();
+ }
+ }
+
+ @Test
+ public void testWhenTryToConnectTwiceSecondAttemtShouldFail() throws InterruptedException, ExecutionException {
+ when(builder.get()).then(invocation -> {
+ Thread.sleep(100);
+ return response;
+ });
+
+ try (SseEventSource eventSource = withReconnect()) {
+ eventSource.open();
+
+ // The attempt to open the SSE connection in another thread at the same
+ // time should fail
+ final Future<?> future = executor.submit(() -> eventSource.open());
+ exception.expectCause(instanceOf(IllegalStateException.class));
+ assertThat(future.get(), equalTo(null));
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+ verify(builder, times(1)).get();
+ }
+ }
+
+ @Test
+ public void testNoReconnectAndOneEventReceived() throws InterruptedException, IOException {
+ try (InputStream is = new ByteArrayInputStream(EVENT.getBytes(StandardCharsets.UTF_8))) {
+ when(response.getStatus()).thenReturn(200);
+ when(response.readEntity(InputStream.class)).thenReturn(is);
+
+ final List<InboundSseEvent> events = new ArrayList<>();
+ try (SseEventSource eventSource = withNoReconnect()) {
+ eventSource.register(events::add);
+ eventSource.open();
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+ verify(response, times(1)).getStatus();
+
+ // Allow the event processor to pull for events (150ms)
+ Thread.sleep(150);
+ }
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getId(), equalTo("1"));
+ assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+ assertThat(events.get(0).getComment(), equalTo("test comment"));
+ }
+ }
+
+ @Test
+ public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOException {
+ final Collection<InputStream> closeables = new ArrayList<>();
+
+ try {
+ when(response.getStatus()).thenReturn(200);
+ when(response.readEntity(InputStream.class)).then(Invocation -> {
+ final InputStream is = new ByteArrayInputStream(EVENT_NO_RETRY.getBytes(StandardCharsets.UTF_8));
+ closeables.add(is);
+ return is;
+ });
+
+ final List<InboundSseEvent> events = new ArrayList<>();
+ try (SseEventSource eventSource = withReconnect()) {
+ eventSource.register(events::add);
+ eventSource.open();
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+ //verify(response, times(1)).getStatus();
+
+ // Allow the event processor to pull for events (200ms)
+ Thread.sleep(150);
+ }
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getId(), equalTo("1"));
+ assertThat(events.get(0).getComment(), equalTo("test comment"));
+ assertThat(events.get(1).getId(), equalTo("1"));
+ assertThat(events.get(1).getComment(), equalTo("test comment"));
+ } finally {
+ for (final InputStream is: closeables) {
+ is.close();
+ }
+ }
+ }
+
+ @Test
+ public void testNoReconnectAndCloseTheStreamWhileEventIsBeingReceived() throws InterruptedException, IOException {
+ when(response.getStatus()).thenReturn(200);
+ when(response.readEntity(InputStream.class)).then(invocation -> {
+ Thread.sleep(200);
+ return null;
+ });
+
+ final List<InboundSseEvent> events = new ArrayList<>();
+ try (SseEventSource eventSource = withNoReconnect()) {
+ eventSource.register(events::add);
+ eventSource.open();
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+ verify(response, times(1)).getStatus();
+
+ // Allow the event processor to pull for events (200ms)
+ Thread.sleep(50);
+ assertThat(eventSource.close(100, TimeUnit.MILLISECONDS), equalTo(true));
+ assertThat(eventSource.isOpen(), equalTo(false));
+ }
+ }
+
+ @Test
+ public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, IOException {
+ try (InputStream is = new ByteArrayInputStream(EVENT_BAD_RETRY.getBytes(StandardCharsets.UTF_8))) {
+ when(response.getStatus()).thenReturn(200);
+ when(response.readEntity(InputStream.class)).thenReturn(is);
+
+ final List<InboundSseEvent> events = new ArrayList<>();
+ try (SseEventSource eventSource = withNoReconnect()) {
+ eventSource.register(events::add);
+ eventSource.open();
+
+ assertThat(eventSource.isOpen(), equalTo(true));
+ verify(response, times(1)).getStatus();
+
+ // Allow the event processor to pull for events (150ms)
+ Thread.sleep(150);
+ }
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getId(), equalTo("1"));
+ assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
+ assertThat(events.get(0).getComment(), equalTo("test comment"));
+ }
+ }
+
+ @Test
+ public void testTryToCloseWhileConnecting() throws ExecutionException, InterruptedException {
+ when(response.getStatus()).thenReturn(200);
+ when(builder.get()).then(invocation -> {
+ Thread.sleep(200);
+ return response;
+ });
+
+ try (SseEventSource eventSource = withNoReconnect()) {
+ final Future<?> future = executor.submit(() -> eventSource.open());
+
+ // Wait a bit for open() to advance
+ Thread.sleep(100);
+ eventSource.close();
+
+ assertThat(future.get(), equalTo(null));
+ assertThat(eventSource.isOpen(), equalTo(false));
+ }
+ }
+
+ private SseEventSource withNoReconnect() {
+ return SseEventSource.target(target).build();
+ }
+
+ private SseEventSource withReconnect() {
+ return SseEventSource.target(target).reconnectingEvery(100, TimeUnit.MILLISECONDS).build();
+ }
+}