You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/08/15 15:31:37 UTC

cxf git commit: CXF-7085: Introduce support for Server Sent Events (Client). Adding more test cases, multiple bugfixes and improvements

Repository: cxf
Updated Branches:
  refs/heads/master 262ebe6bd -> f84cad4f5


CXF-7085: Introduce support for Server Sent Events (Client). Adding more test cases, multiple bugfixes and improvements


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/f84cad4f
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/f84cad4f
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/f84cad4f

Branch: refs/heads/master
Commit: f84cad4f52af9df97ab40605d0c3a1a54c8ce083
Parents: 262ebe6
Author: reta <dr...@gmail.com>
Authored: Tue Aug 15 11:31:03 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Tue Aug 15 11:31:03 2017 -0400

----------------------------------------------------------------------
 rt/rs/sse/pom.xml                               |   6 +
 .../jaxrs/sse/client/InboundSseEventImpl.java   |   6 +-
 .../sse/client/InboundSseEventProcessor.java    |   4 +-
 .../jaxrs/sse/client/SseEventSourceImpl.java    |  25 +-
 .../sse/client/SseEventSourceImplTest.java      | 356 +++++++++++++++++++
 5 files changed, 386 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
index c53d504..97aaf83 100644
--- a/rt/rs/sse/pom.xml
+++ b/rt/rs/sse/pom.xml
@@ -67,5 +67,11 @@
             <groupId>org.atmosphere</groupId>
             <artifactId>atmosphere-runtime</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.7.14</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
index 8a9d8aa..f173576 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java
@@ -37,7 +37,7 @@ import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.jaxrs.client.ClientProviderFactory;
 import org.apache.cxf.message.Message;
 
-public class InboundSseEventImpl implements InboundSseEvent {
+public final class InboundSseEventImpl implements InboundSseEvent {
     private final String id;
     private final String name;
     private final String comment;
@@ -142,13 +142,13 @@ public class InboundSseEventImpl implements InboundSseEvent {
 
     @Override
     public <T> T readData(Class<T> type) {
-        return read(type, type, MediaType.WILDCARD_TYPE);
+        return read(type, type, MediaType.TEXT_PLAIN_TYPE);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <T> T readData(GenericType<T> type) {
-        return read((Class<T>)type.getRawType(), type.getType(), MediaType.WILDCARD_TYPE);
+        return read((Class<T>)type.getRawType(), type.getType(), MediaType.TEXT_PLAIN_TYPE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
index d105963..2c0b8b5 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java
@@ -56,7 +56,7 @@ public class InboundSseEventProcessor {
     protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener) {
         this.endpoint = endpoint;
         this.listener = listener;
-        this.executor = Executors.newSingleThreadExecutor();
+        this.executor = Executors.newSingleThreadScheduledExecutor();
     }
     
     void run(final Response response) {
@@ -94,7 +94,7 @@ public class InboundSseEventProcessor {
                         if (line.startsWith(ID)) {
                             builder.id(line.substring(ID.length()));
                         } else if (line.startsWith(COMMENT)) {
-                            builder.id(line.substring(COMMENT.length()));
+                            builder.comment(line.substring(COMMENT.length()));
                         } else if (line.startsWith(RETRY)) {
                             builder.reconnectDelay(line.substring(RETRY.length()));
                         } else if (line.startsWith(DATA)) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
index d8b22d4..5da7a5b 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java
@@ -196,11 +196,19 @@ public class SseEventSourceImpl implements SseEventSource {
             // response code. In this case, we should give up.
             if (response.getStatus() == 204) {
                 LOG.fine("SSE endpoint " + target.getUri() + " returns no data, disconnecting");
-                state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED);
+                state.set(SseSourceState.CLOSED);
                 response.close();
                 return;
             }
 
+            // Should not happen but if close() was called from another thread, we could
+            // end up there.
+            if (state.get() == SseSourceState.CLOSED) {
+                LOG.fine("SSE connection to " + target.getUri() + " has been closed already");
+                response.close();
+                return;
+            }
+            
             final Endpoint endpoint = WebClient.getConfig(target).getEndpoint();
             // Create new processor if this is the first time or the old one has been closed 
             if (processor == null || processor.isClosed()) {
@@ -212,7 +220,10 @@ public class SseEventSourceImpl implements SseEventSource {
             processor.run(response);
             LOG.fine("SSE event processor has been started ...");
             
-            state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN);
+            if (!state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN)) {
+                throw new IllegalStateException("The SseEventSource is already in " + state.get() + " state");
+            }
+            
             LOG.fine("Successfuly opened SSE connection to " + target.getUri());
         } catch (final Exception ex) {
             if (processor != null) {
@@ -273,10 +284,12 @@ public class SseEventSourceImpl implements SseEventSource {
         }
         
         // If the connection was still on connecting state, just try to reconnect
-        if (state.get() != SseSourceState.CONNECTING 
-            && !state.compareAndSet(SseSourceState.OPEN, SseSourceState.CONNECTING)) {
-            throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get()
-                + " state, unable to reconnect");
+        if (state.get() != SseSourceState.CONNECTING) { 
+            LOG.fine("The SseEventSource is still opened, moving it to connecting state");
+            if (!state.compareAndSet(SseSourceState.OPEN, SseSourceState.CONNECTING)) {
+                throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get()
+                    + " state, unable to reconnect");
+            }
         }
                 
         executor.schedule(() -> {

http://git-wip-us.apache.org/repos/asf/cxf/blob/f84cad4f/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
new file mode 100644
index 0000000..9d28972
--- /dev/null
+++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.Configuration;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.jaxrs.client.ClientConfiguration;
+import org.apache.cxf.jaxrs.client.ClientProviderFactory;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.client.spec.ClientImpl.WebTargetImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SseEventSourceImplTest extends Assert {
+    private static final String EVENT = "    event\n"
+        + "id: 1\n"
+        + "data: test data\n"
+        + "retry: 10000\n"
+        + ": test comment\n"
+        + "\n";
+    
+    private static final String EVENT_NO_RETRY = "    event\n"
+        + "id: 1\n"
+        + "data: test data\n"
+        + ": test comment\n"
+        + "\n";
+    
+    private static final String EVENT_BAD_RETRY = "    event\n"
+        + "id: 1\n"
+        + "data: test data\n"
+        + "retry: blba\n"
+        + ": test comment\n"
+        + "\n";
+    
+    @Rule
+    public ExpectedException exception = ExpectedException.none();
+    
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+    private final ClientProviderFactory clientProviderFactory = ClientProviderFactory.createInstance(null);
+    
+    /**
+     * Subclass the WebClient to augment the visibility of getConfiguration() method.
+     */
+    private static class TestWebClient extends WebClient {
+        TestWebClient(URI baseURI) {
+            super(baseURI);
+        }
+        
+        @Override
+        public ClientConfiguration getConfiguration() {
+            return super.getConfiguration();
+        }
+    }
+    
+    @Mock
+    private TestWebClient client;
+    @Mock
+    private ClientConfiguration clientConfiguration;
+    @Mock
+    private WebTargetImpl target;
+    @Mock
+    private Configuration configuration;
+    @Mock
+    private Invocation.Builder builder;
+    @Mock
+    private Endpoint endpoint;
+    @Mock
+    private Response response;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() {
+        when(target.getConfiguration()).thenReturn(configuration);
+        when(target.getWebClient()).thenReturn(client);
+        when(target.request(MediaType.SERVER_SENT_EVENTS)).thenReturn(builder);
+        when(builder.headers(any(MultivaluedMap.class))).thenReturn(builder);
+        when(builder.get()).thenReturn(response);
+        when(client.getConfiguration()).thenReturn(clientConfiguration);
+        when(clientConfiguration.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.get("org.apache.cxf.jaxrs.client.ClientProviderFactory")).thenReturn(clientProviderFactory);
+    }
+    
+    @After
+    public void tearDown() throws InterruptedException {
+        executor.shutdown();
+        executor.awaitTermination(1, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testNoReconnectWhenNoContentIsReturned() {
+        // Verify that 204 response code won't force reconnect
+        when(response.getStatus()).thenReturn(204);
+        
+        try (SseEventSource eventSource = withNoReconnect()) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            verify(response, times(1)).getStatus();
+        }
+    }
+    
+    @Test
+    public void testReuseSameEventSourceSeveralTimes() {
+        // Verify that 204 response code won't force reconnect
+        when(response.getStatus()).thenReturn(204);
+        
+        try (SseEventSource eventSource = withNoReconnect()) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            verify(response, times(1)).getStatus();
+
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            verify(response, times(2)).getStatus();
+        }
+    }
+
+    @Test
+    public void testReconnectWillBeScheduledOnError() throws InterruptedException {
+        when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
+        
+        try (SseEventSource eventSource = withReconnect()) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            
+            // Sleep a little bit for reconnect to reschedule
+            Thread.sleep(150);
+            verify(builder, atLeast(2)).get();
+        }
+    }
+    
+    @Test
+    public void testNoReconnectWillBeScheduledWhenClosed() throws InterruptedException {
+        when(builder.get()).thenThrow(new RuntimeException("Connection refused"));
+        
+        try (SseEventSource eventSource = withReconnect()) {
+            eventSource.open();
+            assertThat(eventSource.isOpen(), equalTo(false));
+            eventSource.close(1, TimeUnit.SECONDS);
+            
+            // Sleep a little bit to make sure for reconnect to reschedule (after 100ms)
+            Thread.sleep(150);
+            verify(builder, times(1)).get();
+        }
+    }
+    
+    @Test
+    public void testWhenTryToConnectTwiceSecondAttemtShouldFail() throws InterruptedException, ExecutionException {
+        when(builder.get()).then(invocation -> {
+            Thread.sleep(100);
+            return response;
+        });
+        
+        try (SseEventSource eventSource = withReconnect()) {
+            eventSource.open();
+            
+            // The attempt to open the SSE connection in another thread at the same 
+            // time should fail 
+            final Future<?> future = executor.submit(() -> eventSource.open());
+            exception.expectCause(instanceOf(IllegalStateException.class));
+            assertThat(future.get(), equalTo(null));
+            
+            assertThat(eventSource.isOpen(), equalTo(true));
+            verify(builder, times(1)).get();
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndOneEventReceived() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(1));
+            assertThat(events.get(0).getId(), equalTo("1"));
+            assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
+            assertThat(events.get(0).getComment(), equalTo("test comment"));
+        }
+    }
+    
+    @Test
+    public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOException {
+        final Collection<InputStream> closeables = new ArrayList<>();
+        
+        try {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).then(Invocation -> {
+                final InputStream is = new ByteArrayInputStream(EVENT_NO_RETRY.getBytes(StandardCharsets.UTF_8));
+                closeables.add(is);
+                return is;
+            });
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                //verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (200ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(2));
+            assertThat(events.get(0).getId(), equalTo("1"));
+            assertThat(events.get(0).getComment(), equalTo("test comment"));
+            assertThat(events.get(1).getId(), equalTo("1"));
+            assertThat(events.get(1).getComment(), equalTo("test comment"));
+        } finally {
+            for (final InputStream is: closeables) {
+                is.close();
+            }
+        }
+    }
+    
+    @Test
+    public void testNoReconnectAndCloseTheStreamWhileEventIsBeingReceived() throws InterruptedException, IOException {
+        when(response.getStatus()).thenReturn(200);
+        when(response.readEntity(InputStream.class)).then(invocation -> {
+            Thread.sleep(200);
+            return null;
+        });
+        
+        final List<InboundSseEvent> events = new ArrayList<>();
+        try (SseEventSource eventSource = withNoReconnect()) {
+            eventSource.register(events::add);
+            eventSource.open();
+            
+            assertThat(eventSource.isOpen(), equalTo(true));
+            verify(response, times(1)).getStatus();
+            
+            // Allow the event processor to pull for events (200ms)
+            Thread.sleep(50);
+            assertThat(eventSource.close(100, TimeUnit.MILLISECONDS), equalTo(true));
+            assertThat(eventSource.isOpen(), equalTo(false));
+        }
+    }
+    
+    @Test
+    public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, IOException {
+        try (InputStream is = new ByteArrayInputStream(EVENT_BAD_RETRY.getBytes(StandardCharsets.UTF_8))) {
+            when(response.getStatus()).thenReturn(200);
+            when(response.readEntity(InputStream.class)).thenReturn(is);
+            
+            final List<InboundSseEvent> events = new ArrayList<>();
+            try (SseEventSource eventSource = withNoReconnect()) {
+                eventSource.register(events::add);
+                eventSource.open();
+                
+                assertThat(eventSource.isOpen(), equalTo(true));
+                verify(response, times(1)).getStatus();
+                
+                // Allow the event processor to pull for events (150ms)
+                Thread.sleep(150);
+            }
+            
+            assertThat(events.size(), equalTo(1));
+            assertThat(events.get(0).getId(), equalTo("1"));
+            assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
+            assertThat(events.get(0).getComment(), equalTo("test comment"));
+        }
+    }
+
+    @Test
+    public void testTryToCloseWhileConnecting() throws ExecutionException, InterruptedException {
+        when(response.getStatus()).thenReturn(200);
+        when(builder.get()).then(invocation -> {
+            Thread.sleep(200);
+            return response;
+        });
+        
+        try (SseEventSource eventSource = withNoReconnect()) {
+            final Future<?> future = executor.submit(() -> eventSource.open());
+            
+            // Wait a bit for open() to advance
+            Thread.sleep(100);
+            eventSource.close();
+            
+            assertThat(future.get(), equalTo(null));
+            assertThat(eventSource.isOpen(), equalTo(false));
+        }
+    }
+    
+    private SseEventSource withNoReconnect() {
+        return SseEventSource.target(target).build();
+    }
+    
+    private SseEventSource withReconnect() {
+        return SseEventSource.target(target).reconnectingEvery(100, TimeUnit.MILLISECONDS).build();
+    }
+}