You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/02/29 03:18:28 UTC
[cxf] branch master updated: CXF-8215: SSE breaks Pipeline
Processing (#643)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new c4aefe5 CXF-8215: SSE breaks Pipeline Processing (#643)
c4aefe5 is described below
commit c4aefe5fec2c94354517f49a55cf0d6efdb98545
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Fri Feb 28 22:18:17 2020 -0500
CXF-8215: SSE breaks Pipeline Processing (#643)
---
.../cxf/jaxrs/sse/SseEventSinkContextProvider.java | 17 +++++
.../java/org/apache/cxf/jaxrs/sse/SseFeature.java | 7 ++
.../cxf/jaxrs/sse/interceptor/SseInterceptor.java | 82 ++++++++++++++++++++++
.../cxf/transport/sse/SseProvidersExtension.java | 3 +
.../cxf/systest/jaxrs/sse/AbstractSseTest.java | 23 ++++++
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 23 ++++++
.../apache/cxf/systest/jaxrs/sse/BookStore2.java | 23 ++++++
.../systest/jaxrs/sse/BookStoreResponseFilter.java | 51 ++++++++++++++
.../cxf/systest/jaxrs/sse/SseApplication.java | 6 +-
.../jaxrs/sse/jetty/AbstractJettyServer.java | 7 +-
.../jaxrs/sse/tomcat/AbstractTomcatServer.java | 6 +-
.../jaxrs/sse/undertow/AbstractUndertowServer.java | 5 +-
12 files changed, 249 insertions(+), 4 deletions(-)
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
index e829aa7..ed43f08 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventSinkContextProvider.java
@@ -20,6 +20,9 @@
package org.apache.cxf.jaxrs.sse;
+import java.util.ArrayList;
+import java.util.Collection;
+
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.ext.MessageBodyWriter;
@@ -27,9 +30,12 @@ import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.common.util.PropertyUtils;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.jaxrs.ext.ContextProvider;
import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
@@ -47,6 +53,17 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink
final AsyncResponse async = new AsyncResponseImpl(message);
final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
+ final Collection<Interceptor<? extends Message>> interceptors =
+ CastUtils.cast((Collection<?>)message.get(Message.IN_INTERCEPTORS));
+
+ final Collection<Interceptor<? extends Message>> chain = new ArrayList<>();
+ if (interceptors != null) {
+ chain.addAll(interceptors);
+ }
+
+ chain.add(new SseInterceptor());
+ message.put(Message.IN_INTERCEPTORS, chain);
+
if (bufferSize != null) {
return new SseEventSinkImpl(writer, async, request.getAsyncContext(), bufferSize);
} else {
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index 7678162..57464ab 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -28,7 +28,9 @@ import org.apache.cxf.annotations.Provider.Type;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.feature.AbstractPortableFeature;
import org.apache.cxf.feature.DelegatingFeature;
+import org.apache.cxf.interceptor.InterceptorProvider;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
@Provider(value = Type.Feature, scope = Scope.Server)
public class SseFeature extends DelegatingFeature<SseFeature.Portable> {
@@ -49,5 +51,10 @@ public class SseFeature extends DelegatingFeature<SseFeature.Portable> {
((ServerProviderFactory) server.getEndpoint().get(
ServerProviderFactory.class.getName())).setUserProviders(providers);
}
+
+ @Override
+ public void doInitializeProvider(InterceptorProvider provider, Bus bus) {
+ provider.getInInterceptors().add(new SseInterceptor());
+ }
}
}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
new file mode 100644
index 0000000..8cbdaa1
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/interceptor/SseInterceptor.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.sse.interceptor;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxrs.impl.ResponseImpl;
+import org.apache.cxf.jaxrs.model.OperationResourceInfo;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+
+public class SseInterceptor extends AbstractPhaseInterceptor<Message> {
+ private static final Logger LOG = LogUtils.getL7dLogger(SseInterceptor.class);
+
+ public SseInterceptor() {
+ super(Phase.POST_LOGICAL);
+ }
+ public SseInterceptor(String phase) {
+ super(phase);
+ }
+
+ public void handleMessage(Message message) {
+ if (!isRequestor(message) && message.get(SseInterceptor.class) == null) {
+ message.put(SseInterceptor.class, this);
+
+ final Exchange exchange = message.getExchange();
+ OperationResourceInfo ori = (OperationResourceInfo)exchange.get(OperationResourceInfo.class.getName());
+ if (ori != null) {
+ Response.ResponseBuilder builder = Response.ok();
+
+ final ServerProviderFactory providerFactory = ServerProviderFactory.getInstance(message);
+ final Object response = message.get(AbstractHTTPDestination.HTTP_RESPONSE);
+ if (response instanceof HttpServletResponse) {
+ final HttpServletResponse servletResponse = (HttpServletResponse)response;
+
+ builder = Response.status(servletResponse.getStatus());
+ for (final String header: servletResponse.getHeaderNames()) {
+ builder.header(header, servletResponse.getHeaders(header));
+ }
+ }
+
+ // Run the filters
+ try {
+ final ResponseImpl responseImpl = (ResponseImpl)builder.build();
+ JAXRSUtils.runContainerResponseFilters(providerFactory, responseImpl,
+ message, ori, ori.getAnnotatedMethod());
+ } catch (Throwable ex) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
index 0277611..268db76 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseProvidersExtension.java
@@ -27,6 +27,7 @@ import org.apache.cxf.buslifecycle.BusCreationListener;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.jaxrs.sse.SseContextProvider;
import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
+import org.apache.cxf.jaxrs.sse.interceptor.SseInterceptor;
public class SseProvidersExtension implements BusCreationListener {
@@ -49,6 +50,8 @@ public class SseProvidersExtension implements BusCreationListener {
} else {
bus.setProperty(BUS_PROVIDERS, sseProviders);
}
+
+ bus.getInInterceptors().add(new SseInterceptor());
}
}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
index e0077a9..e79571b 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java
@@ -107,6 +107,29 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest {
// Easing the test verification here, it does not work well for Atm + Jetty
assertTrue(books.isEmpty());
}
+
+ @Test
+ public void testContainerResponseFilterIsCalled() throws InterruptedException {
+ final WebTarget target = createWebTarget("/rest/api/bookstore/filtered/sse");
+ final Collection<Book> books = new ArrayList<>();
+
+ assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+ .request()
+ .get(Integer.class), equalTo(0));
+
+ try (SseEventSource eventSource = SseEventSource.target(target).build()) {
+ eventSource.register(collect(books), System.out::println);
+ eventSource.open();
+ // Give the SSE stream some time to collect all events
+ Thread.sleep(1000);
+ }
+ // Easing the test verification here, it does not work well for Atm + Jetty
+ assertTrue(books.isEmpty());
+
+ assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
+ .request()
+ .get(Integer.class), equalTo(1));
+ }
@Test
public void testBooksStreamIsReconnectedFromInboundSseEvents() throws InterruptedException {
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index a32f731..8978382 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -149,7 +149,30 @@ public class BookStore extends BookStoreClientCloseable {
LOG.error("Wait has been interrupted", ex);
}
}
+
+ @GET
+ @Path("/filtered/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void filtered(@Context SseEventSink sink) {
+ new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(200);
+ sink.close();
+ } catch (final InterruptedException ex) {
+ LOG.error("Communication error", ex);
+ }
+ }
+ }.start();
+ }
+ @GET
+ @Path("/filtered/stats")
+ @Produces(MediaType.TEXT_PLAIN)
+ public int filteredStats() {
+ return BookStoreResponseFilter.getInvocations();
+ }
+
@Override
protected Sse getSse() {
return sse;
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index d4f0158..629d7b8 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -148,7 +148,30 @@ public class BookStore2 extends BookStoreClientCloseable {
LOG.error("Wait has been interrupted", ex);
}
}
+
+ @GET
+ @Path("/filtered/sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void filtered(@Context SseEventSink sink) {
+ new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(200);
+ sink.close();
+ } catch (final InterruptedException ex) {
+ LOG.error("Communication error", ex);
+ }
+ }
+ }.start();
+ }
+ @GET
+ @Path("/filtered/stats")
+ @Produces(MediaType.TEXT_PLAIN)
+ public int filteredStats() {
+ return BookStoreResponseFilter.getInvocations();
+ }
+
@Override
protected Sse getSse() {
return sse;
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
new file mode 100644
index 0000000..ee4417e
--- /dev/null
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/BookStoreResponseFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.sse;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerResponseContext;
+import javax.ws.rs.container.ContainerResponseFilter;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class BookStoreResponseFilter implements ContainerResponseFilter {
+ private static AtomicInteger counter = new AtomicInteger(0);
+ @Context private UriInfo uriInfo;
+
+ public BookStoreResponseFilter() {
+ counter.set(0);
+ }
+
+ @Override
+ public void filter(ContainerRequestContext reqContext, ContainerResponseContext rspContext) throws IOException {
+ if (uriInfo.getRequestUri().getPath().endsWith("/filtered/sse")) {
+ counter.incrementAndGet();
+ }
+ }
+
+ public static int getInvocations() {
+ return counter.get();
+ }
+}
diff --git a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/SseApplication.java b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/SseApplication.java
index a4a4049..69efb72 100644
--- a/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/SseApplication.java
+++ b/systests/rs-sse/rs-sse-base/src/main/java/org/apache/cxf/systest/jaxrs/sse/SseApplication.java
@@ -18,7 +18,9 @@
*/
package org.apache.cxf.systest.jaxrs.sse;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.core.Application;
@@ -33,6 +35,8 @@ public class SseApplication extends Application {
@Override
public Set<Object> getSingletons() {
- return Collections.singleton(new JacksonJsonProvider());
+ return new HashSet<>(Arrays.asList(
+ new JacksonJsonProvider(),
+ new BookStoreResponseFilter()));
}
}
diff --git a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
index 5912e4f..8e7f456 100644
--- a/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
+++ b/systests/rs-sse/rs-sse-jetty/src/test/java/org/apache/cxf/systest/jaxrs/sse/jetty/AbstractJettyServer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
+import org.apache.cxf.systest.jaxrs.sse.BookStoreResponseFilter;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
@@ -59,7 +60,11 @@ public abstract class AbstractJettyServer extends AbstractBusTestServerBase {
// Register and map the dispatcher servlet
final ServletHolder holder = new ServletHolder(new CXFNonSpringJaxrsServlet());
holder.setInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
- holder.setInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
+ holder.setInitParameter("jaxrs.providers", String.join(",",
+ JacksonJsonProvider.class.getName(),
+ BookStoreResponseFilter.class.getName()
+ ));
+
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath(contextPath);
context.addServlet(holder, "/rest/*");
diff --git a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
index acb8106..08e720a 100644
--- a/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
+++ b/systests/rs-sse/rs-sse-tomcat/src/test/java/org/apache/cxf/systest/jaxrs/sse/tomcat/AbstractTomcatServer.java
@@ -29,6 +29,7 @@ import org.apache.catalina.Wrapper;
import org.apache.catalina.startup.Tomcat;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
+import org.apache.cxf.systest.jaxrs.sse.BookStoreResponseFilter;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
import static org.junit.Assert.fail;
@@ -62,7 +63,10 @@ public abstract class AbstractTomcatServer extends AbstractBusTestServerBase {
final Context context = server.addContext("/", base.getAbsolutePath());
final Wrapper cxfServlet = Tomcat.addServlet(context, "cxfServlet", new CXFNonSpringJaxrsServlet());
cxfServlet.addInitParameter("jaxrs.serviceClasses", BookStore.class.getName());
- cxfServlet.addInitParameter("jaxrs.providers", JacksonJsonProvider.class.getName());
+ cxfServlet.addInitParameter("jaxrs.providers", String.join(",",
+ JacksonJsonProvider.class.getName(),
+ BookStoreResponseFilter.class.getName()
+ ));
cxfServlet.setAsyncSupported(true);
context.addServletMappingDecoded("/rest/*", "cxfServlet");
} else {
diff --git a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
index 301f9a1..87af8b8 100644
--- a/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
+++ b/systests/rs-sse/rs-sse-undertow/src/test/java/org/apache/cxf/systest/jaxrs/sse/undertow/AbstractUndertowServer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.cxf.systest.jaxrs.sse.BookStore;
+import org.apache.cxf.systest.jaxrs.sse.BookStoreResponseFilter;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
import io.undertow.Handlers;
@@ -54,7 +55,9 @@ public abstract class AbstractUndertowServer extends AbstractBusTestServerBase {
.setDeploymentName("sse-test")
.addServlets(
servlet("MessageServlet", CXFNonSpringJaxrsServlet.class)
- .addInitParam("jaxrs.providers", JacksonJsonProvider.class.getName())
+ .addInitParam("jaxrs.providers", String.join(",",
+ JacksonJsonProvider.class.getName(),
+ BookStoreResponseFilter.class.getName()))
.addInitParam("jaxrs.serviceClasses", BookStore.class.getName())
.setAsyncSupported(true)
.setLoadOnStartup(1)