You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/12 22:30:26 UTC
[cxf] branch 3.3.x-fixes updated: CXF-8215: SSE breaks Pipeline
Processing. Fixed the regression (the request filters are called twice for
non-SSE endpoints), fixed CDI support
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
new f044a02 CXF-8215: SSE breaks Pipeline Processing. Fixed the regression (the request filters are called twice for non-SSE endpoints), fixed CDI support
f044a02 is described below
commit f044a02831ad2adff59155924682f73ffdf6213a
Author: reta <dr...@gmail.com>
AuthorDate: Wed Mar 11 22:40:02 2020 -0400
CXF-8215: SSE breaks Pipeline Processing. Fixed the regression (the request filters are called twice for non-SSE endpoints), fixed CDI support
(cherry picked from commit 5b59268397d2d8fea820aee2645b96b2cc04a90a)
---
.../cxf/jaxrs/sse/SseEventSinkContextProvider.java | 24 ++++++++--------------
.../ext/SseTransportCustomizationExtension.java | 2 ++
.../cxf/jaxrs/sse/interceptor/SseInterceptor.java | 6 ++++++
.../cxf/transport/sse/SseProvidersExtension.java | 1 -
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 23 ++++++++++++++++++++-
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 7 +++++++
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 7 +++++++
.../systest/jaxrs/sse/BookStoreResponseFilter.java | 6 +++++-
8 files changed, 57 insertions(+), 19 deletions(-)
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index ed43f08..e11a3fa 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -20,9 +20,6 @@
package org.apache.cxf.jaxrs.sse;
-import java.util.ArrayList;
-import java.util.Collection;
-
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.ext.MessageBodyWriter;
@@ -30,12 +27,9 @@ import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.common.util.PropertyUtils;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.jaxrs.ext.ContextProvider;
import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
@@ -53,17 +47,15 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink
final AsyncResponse async = new AsyncResponseImpl(message);
final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
- final Collection<Interceptor<? extends Message>> interceptors =
- CastUtils.cast((Collection<?>)message.get(Message.IN_INTERCEPTORS));
-
- final Collection<Interceptor<? extends Message>> chain = new ArrayList<>();
- if (interceptors != null) {
- chain.addAll(interceptors);
- }
-
- chain.add(new SseInterceptor());
- message.put(Message.IN_INTERCEPTORS, chain);
+ final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
+ message.put(SseEventSink.class, sink);
+ return sink;
+ }
+
+ protected SseEventSink createSseEventSink(final HttpServletRequest request,
+ final MessageBodyWriter<OutboundSseEvent> writer,
+ final AsyncResponse async, final Integer bufferSize) {
if (bufferSize != null) {
return new SseEventSinkImpl(writer, async, request.getAsyncContext(), bufferSize);
} else {
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
index c6267d4..04d9f20 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/ext/SseTransportCustomizationExtension.java
@@ -22,11 +22,13 @@ import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension;
import org.apache.cxf.jaxrs.sse.SseContextProvider;
import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
public class SseTransportCustomizationExtension implements JAXRSServerFactoryCustomizationExtension {
@Override
public void customize(final JAXRSServerFactoryBean bean) {
bean.setProvider(new SseContextProvider());
bean.setProvider(new SseEventSinkContextProvider());
+ bean.getInInterceptors().add(new SseInterceptor());
}
}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
index 560e10c..0599ee0 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
@@ -28,6 +28,7 @@ import java.util.logging.Logger;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.jaxrs.impl.ResponseImpl;
@@ -52,6 +53,11 @@ public class SseInterceptor extends AbstractPhaseInterceptor<Message> {
}
public void handleMessage(Message message) {
+ // Not an SSE invocation, skipping it in favor of normal processing
+ if (message.get(SseEventSink.class) == null) {
+ return;
+ }
+
if (!isRequestor(message) && message.get(SseInterceptor.class) == null) {
message.put(SseInterceptor.class, this);
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
index 268db76..3b5784e 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -53,5 +53,4 @@ public class SseProvidersExtension implements BusCreationListener {
bus.getInInterceptors().add(new SseInterceptor());
}
-
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index e79571b..370d39b 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -37,6 +37,7 @@ import javax.ws.rs.sse.SseEventSource.Builder;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.containsString;
@@ -47,6 +48,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public abstract class AbstractSseTest extends AbstractSseBaseTest {
+ @Before
+ public void setUp() {
+ assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+ .request()
+ .put(null)
+ .getStatus(), equalTo(204));
+
+ }
+
@Test
public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/" + UUID.randomUUID())
@@ -109,7 +119,7 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
}
@Test
- public void testContainerResponseFilterIsCalled() throws InterruptedException {
+ public void testBooksSseContainerResponseFilterIsCalled() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/filtered/sse");
final Collection<Book> books = new ArrayList<>();
@@ -203,6 +213,17 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
r.close();
}
+
+ @Test
+ public void testBooksContainerResponseFilterIsCalled() throws InterruptedException {
+ Response r = createWebClient("/rest/api/bookstore", MediaType.APPLICATION_JSON).get();
+ assertEquals(Status.OK.getStatusCode(), r.getStatus());
+
+ assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+ .request()
+ .get(Integer.class), equalTo(1));
+ }
+
@Test
public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 8978382..41ffd34 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -28,6 +28,7 @@ import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -172,6 +173,12 @@ public class BookStore extends BookStoreClientCloseable {
public int filteredStats() {
return BookStoreResponseFilter.getInvocations();
}
+
+ @PUT
+ @Path("/filtered/stats")
+ public void clearStats() {
+ BookStoreResponseFilter.reset();
+ }
@Override
protected Sse getSse() {
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index 629d7b8..d7abb04 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -28,6 +28,7 @@ import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -172,6 +173,12 @@ public class BookStore2 extends BookStoreClientCloseable {
return BookStoreResponseFilter.getInvocations();
}
+ @PUT
+ @Path("/filtered/stats")
+ public void clearStats() {
+ BookStoreResponseFilter.reset();
+ }
+
@Override
protected Sse getSse() {
return sse;
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
index ee4417e..0dba7d6 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
@@ -40,7 +40,7 @@ public class BookStoreResponseFilter implements ContainerResponseFilter {
@Override
public void filter(ContainerRequestContext reqContext, ContainerResponseContext rspContext) throws IOException {
- if (uriInfo.getRequestUri().getPath().endsWith("/filtered/sse")) {
+ if (!uriInfo.getRequestUri().getPath().endsWith("/filtered/stats")) {
counter.incrementAndGet();
}
}
@@ -48,4 +48,8 @@ public class BookStoreResponseFilter implements ContainerResponseFilter {
public static int getInvocations() {
return counter.get();
}
+
+ public static void reset() {
+ counter.set(0);
+ }
}