You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/12 23:10:32 UTC

[cxf] 01/02: CXF-8215: SSE breaks Pipeline Processing. Fixed the regression (the request filters are called twice for non-SSE endpoints), fixed CDI support

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 1798eb6a3120aa1210ba55dcfdd15b9b1fdd934e
Author: reta <dr...@gmail.com>
AuthorDate: Wed Mar 11 22:40:02 2020 -0400

    CXF-8215: SSE breaks Pipeline Processing. Fixed the regression (the request filters are called twice for non-SSE endpoints), fixed CDI support
    
    (cherry picked from commit 5b59268397d2d8fea820aee2645b96b2cc04a90a)
    (cherry picked from commit f044a02831ad2adff59155924682f73ffdf6213a)
---
 .../cxf/jaxrs/sse/SseEventSinkContextProvider.java | 24 ++++++++--------------
 .../ext/SseTransportCustomizationExtension.java    |  2 ++
 .../cxf/jaxrs/sse/interceptor/SseInterceptor.java  |  6 ++++++
 .../cxf/transport/sse/SseProvidersExtension.java   |  1 -
 .../cxf/systest/jaxrs/sse/AbstractSseTest.java     | 23 ++++++++++++++++++++-
 .../apache/cxf/systest/jaxrs/sse/BookStore.java    |  7 +++++++
 .../apache/cxf/systest/jaxrs/sse/BookStore2.java   |  7 +++++++
 .../systest/jaxrs/sse/BookStoreResponseFilter.java |  6 +++++-
 8 files changed, 57 insertions(+), 19 deletions(-)

diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index ed43f08..e11a3fa 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -20,9 +20,6 @@
 
 package org.apache.cxf.jaxrs.sse;
 
-import java.util.ArrayList;
-import java.util.Collection;
-
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.ext.MessageBodyWriter;
@@ -30,12 +27,9 @@ import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
 
 import org.apache.cxf.common.util.PropertyUtils;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.jaxrs.ext.ContextProvider;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 
@@ -53,17 +47,15 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink
         final AsyncResponse async = new AsyncResponseImpl(message);
         final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
         
-        final Collection<Interceptor<? extends Message>> interceptors = 
-            CastUtils.cast((Collection<?>)message.get(Message.IN_INTERCEPTORS));
-        
-        final Collection<Interceptor<? extends Message>> chain = new ArrayList<>();
-        if (interceptors != null) {
-            chain.addAll(interceptors);
-        }
-        
-        chain.add(new SseInterceptor());
-        message.put(Message.IN_INTERCEPTORS, chain);
+        final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
+        message.put(SseEventSink.class, sink);
         
+        return sink;
+    }
+
+    protected SseEventSink createSseEventSink(final HttpServletRequest request,
+            final MessageBodyWriter<OutboundSseEvent> writer,
+            final AsyncResponse async, final Integer bufferSize) {
         if (bufferSize != null) {
             return new SseEventSinkImpl(writer, async, request.getAsyncContext(), bufferSize);
         } else {        
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index c6267d4..04d9f20 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -22,11 +22,13 @@ import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
 import org.apache.cxf.jaxrs.sse.SseContextProvider;
 import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
 
 public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
     @Override
     public void customize(final JAXRSServerFactoryBean bean) {
         bean.setProvider(new SseContextProvider());
         bean.setProvider(new SseEventSinkContextProvider());
+        bean.getInInterceptors().add(new SseInterceptor());
     }
 }
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
index 560e10c..0599ee0 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
@@ -28,6 +28,7 @@ import java.util.logging.Logger;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.SseEventSink;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.jaxrs.impl.ResponseImpl;
@@ -52,6 +53,11 @@ public class SseInterceptor extends AbstractPhaseInterceptor<Message> {
     }
 
     public void handleMessage(Message message) {
+        // Not an SSE invocation, skipping it in favor of normal processing
+        if (message.get(SseEventSink.class) == null) {
+            return;
+        }
+
         if (!isRequestor(message) && message.get(SseInterceptor.class) == null) {
             message.put(SseInterceptor.class, this);
 
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
index 268db76..3b5784e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -53,5 +53,4 @@ public class SseProvidersExtension implements BusCreationListener {
         
         bus.getInInterceptors().add(new SseInterceptor());
     }
-    
 }
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index 435d9a2..0e1158a 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -37,6 +37,7 @@ import javax.ws.rs.sse.SseEventSource.Builder;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -44,6 +45,15 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItems;
 
 public abstract class AbstractSseTest extends AbstractSseBaseTest {
+    @Before
+    public void setUp() {
+        assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+            .request()
+            .put(null)
+            .getStatus(), equalTo(204));
+
+    }
+    
     @Test
     public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/sse/" + UUID.randomUUID())
@@ -106,7 +116,7 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
     }
     
     @Test
-    public void testContainerResponseFilterIsCalled() throws InterruptedException {
+    public void testBooksSseContainerResponseFilterIsCalled() throws InterruptedException {
         final WebTarget target = createWebTarget("/rest/api/bookstore/filtered/sse");
         final Collection<Book> books = new ArrayList<>();
 
@@ -200,6 +210,17 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
 
         r.close();
     }
+    
+    @Test
+    public void testBooksContainerResponseFilterIsCalled() throws InterruptedException {
+        Response r = createWebClient("/rest/api/bookstore", MediaType.APPLICATION_JSON).get();
+        assertEquals(Status.OK.getStatusCode(), r.getStatus());
+
+        assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+            .request()
+            .get(Integer.class), equalTo(1));
+    }
+
 
     @Test
     public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 8978382..41ffd34 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -28,6 +28,7 @@ import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -172,6 +173,12 @@ public class BookStore extends BookStoreClientCloseable {
     public int filteredStats() {
         return BookStoreResponseFilter.getInvocations();
     }
+    
+    @PUT
+    @Path("/filtered/stats")
+    public void clearStats() {
+        BookStoreResponseFilter.reset();
+    }
 
     @Override
     protected Sse getSse() {
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index 629d7b8..d7abb04 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -28,6 +28,7 @@ import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -172,6 +173,12 @@ public class BookStore2 extends BookStoreClientCloseable {
         return BookStoreResponseFilter.getInvocations();
     }
 
+    @PUT
+    @Path("/filtered/stats")
+    public void clearStats() {
+        BookStoreResponseFilter.reset();
+    }
+
     @Override
     protected Sse getSse() {
         return sse;
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
index ee4417e..0dba7d6 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
@@ -40,7 +40,7 @@ public class BookStoreResponseFilter implements ContainerResponseFilter {
 
     @Override
     public void filter(ContainerRequestContext reqContext, ContainerResponseContext rspContext) throws IOException {
-        if (uriInfo.getRequestUri().getPath().endsWith("/filtered/sse")) {
+        if (!uriInfo.getRequestUri().getPath().endsWith("/filtered/stats")) {
             counter.incrementAndGet();
         }
     }
@@ -48,4 +48,8 @@ public class BookStoreResponseFilter implements ContainerResponseFilter {
     public static int getInvocations() {
         return counter.get();
     }
+
+    public static void reset() {
+        counter.set(0);
+    }
 }