You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/07/11 00:14:29 UTC
[24/33] cxf git commit: CXF-5855: Introduce support for Server Sent
Events. Initial implementation based on Atmosphere
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/distribution/src/main/release/samples/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml
index d73a7d0..bd7727c 100644
--- a/distribution/src/main/release/samples/pom.xml
+++ b/distribution/src/main/release/samples/pom.xml
@@ -114,6 +114,9 @@
<module>jax_rs/tracing_htrace</module>
<module>clustering/failover_jaxws_osgi</module>
<module>clustering/failover_server</module>
+ <module>jax_rs/sse_cdi</module>
+ <module>jax_rs/sse_tomcat</module>
+ <module>jax_rs/sse_spring</module>
<!--
These are removed from the build as they currently don't inherit the parent from
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d224073..9bd219c 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -110,7 +110,7 @@
<cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
<cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
<cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
- <cxf.javax.ws.rs.version>2.0.1</cxf.javax.ws.rs.version>
+ <cxf.javax.ws.rs.version>2.1-SNAPSHOT</cxf.javax.ws.rs.version>
<cxf.jaxb.version>2.2.11</cxf.jaxb.version>
<cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
<cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>
@@ -2185,4 +2185,19 @@
</build>
</profile>
</profiles>
+
+ <!-- Temporarily only till JAX-RS 2.1 artifacts become available -->
+ <repositories>
+ <repository>
+ <id>maven.java.net</id>
+ <name>java.net snapshots</name>
+ <url>https://maven.java.net/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
index d4e53ac..0f481a3 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSServerFactoryBean.java
@@ -193,6 +193,7 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean {
checkPrivateEndpoint(ep);
factory.applyDynamicFeatures(getServiceFactory().getClassResourceInfo());
+ applyBusFeatures(getBus());
applyFeatures();
getServiceFactory().sendEvent(FactoryBeanListener.Event.SERVER_CREATED,
@@ -246,6 +247,14 @@ public class JAXRSServerFactoryBean extends AbstractJAXRSFactoryBean {
}
+ protected void applyBusFeatures(final Bus bus) {
+ if (bus.getFeatures() != null) {
+ for (Feature feature : bus.getFeatures()) {
+ feature.initialize(server, bus);
+ }
+ }
+ }
+
protected void applyFeatures() {
if (getFeatures() != null) {
for (Feature feature : getFeatures()) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
index 0914854..173284a 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
@@ -32,6 +32,9 @@ import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.EntityTag;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.NioCompletionHandler;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioReaderHandler;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
@@ -382,4 +385,30 @@ public class RequestImpl implements Request {
return 0;
}
}
+
+ @Override
+ public void entity(NioReaderHandler arg0) {
+ // TODO: Not Implemented
+ }
+
+
+
+ @Override
+ public void entity(NioReaderHandler arg0, NioCompletionHandler arg1) {
+ // TODO: Not Implemented
+ }
+
+
+
+ @Override
+ public void entity(NioReaderHandler arg0, NioErrorHandler arg1) {
+ // TODO: Not Implemented
+ }
+
+
+
+ @Override
+ public void entity(NioReaderHandler arg0, NioCompletionHandler arg1, NioErrorHandler arg2) {
+ // TODO: Not Implemented
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
index cee5d36..29c5c42 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
@@ -34,6 +34,8 @@ import javax.ws.rs.core.Link;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.NewCookie;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioWriterHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
@@ -314,4 +316,16 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable {
}
return variants(Arrays.asList(variants));
}
+
+ @Override
+ public ResponseBuilder entity(NioWriterHandler arg0) {
+ // TODO: Not Implemented
+ return this;
+ }
+
+ @Override
+ public ResponseBuilder entity(NioWriterHandler arg0, NioErrorHandler arg1) {
+ // TODO: Not Implemented
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
index 8bcdbb6..44a26f9 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
@@ -23,6 +23,9 @@ import java.util.Date;
import java.util.List;
import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.NioCompletionHandler;
+import javax.ws.rs.core.NioErrorHandler;
+import javax.ws.rs.core.NioReaderHandler;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Variant;
@@ -54,4 +57,24 @@ public class ThreadLocalRequest extends AbstractThreadLocalProxy<Request>
return get().evaluatePreconditions();
}
+ @Override
+ public void entity(NioReaderHandler reader) {
+ get().entity(reader);
+ }
+
+ @Override
+ public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
+ get().entity(reader, completion);
+ }
+
+ @Override
+ public void entity(NioReaderHandler reader, NioErrorHandler error) {
+ get().entity(reader, error);
+ }
+
+ @Override
+ public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
+ get().entity(reader, completion, error);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index a374743..17d3eec 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -38,5 +38,6 @@
<module>extensions/providers</module>
<module>extensions/search</module>
<module>security</module>
+ <module>sse</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml
new file mode 100644
index 0000000..43e5c66
--- /dev/null
+++ b/rt/rs/sse/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cxf-rt-rs-sse</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache CXF JAX-RS Server-Side Events Support</name>
+ <description>Apache CXF JAX-RS Server-Side Events Support</description>
+ <url>http://cxf.apache.org</url>
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-parent</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ <relativePath>../../../parent/pom.xml</relativePath>
+ </parent>
+ <properties>
+ <cxf.osgi.import>
+ javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
+ </cxf.osgi.import>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${cxf.servlet-api.group}</groupId>
+ <artifactId>${cxf.servlet-api.artifact}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.atmosphere</groupId>
+ <artifactId>atmosphere-runtime</artifactId>
+ <version>${cxf.atmosphere.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
new file mode 100644
index 0000000..4a9b3aa
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventBodyWriter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+@Provider
+public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> {
+ public static final String SERVER_SENT_EVENTS = "text/event-stream";
+ public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
+
+ private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] EVENT = " ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8);
+
+ private ServerProviderFactory factory;
+ private Message message;
+
+ protected OutboundSseEventBodyWriter() {
+ }
+
+ public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) {
+ this.factory = factory;
+ this.message = new MessageImpl();
+ this.message.setExchange(exchange);
+ }
+
+
+ @Override
+ public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
+ return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt);
+ }
+
+ @Override
+ public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
+ MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os)
+ throws IOException, WebApplicationException {
+
+ if (p.getName() != null) {
+ os.write(EVENT);
+ os.write(p.getName().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getId() != null) {
+ os.write(ID);
+ os.write(p.getId().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getComment() != null) {
+ os.write(COMMENT);
+ os.write(p.getComment().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getReconnectDelay() > 0) {
+ os.write(RETRY);
+ os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getData() != null) {
+ Class<?> payloadClass = p.getType();
+ Type payloadType = p.getGenericType();
+ if (payloadType == null) {
+ payloadType = payloadClass;
+ }
+
+ if (payloadType == null && payloadClass == null) {
+ payloadType = Object.class;
+ payloadClass = Object.class;
+ }
+
+ os.write(DATA);
+ writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
+ os.write(NEW_LINE);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, Object> headers, Object data, OutputStream os)
+ throws IOException, WebApplicationException {
+
+ MessageBodyWriter<T> writer = null;
+ if (message != null && factory != null) {
+ writer = factory.createMessageBodyWriter(cls, type, anns, mt, message);
+ }
+
+ if (writer == null) {
+ throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName());
+ }
+
+ writer.writeTo((T)data, cls, type, anns, mt, headers, os);
+ }
+
+ @Override
+ public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations,
+ MediaType mediaType) {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
new file mode 100644
index 0000000..f852637
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/OutboundSseEventImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+public class OutboundSseEventImpl implements OutboundSseEvent {
+ private String id;
+ private String name;
+ private String comment;
+ private long reconnectDelay = -1;
+ private Class<?> type;
+ private Type genericType;
+ private MediaType mediaType;
+ private Object data;
+
+ public static class BuilderImpl implements Builder {
+ private String id;
+ private String name;
+ private String comment;
+ private long reconnectDelay = -1;
+ private Class<?> type;
+ private Type genericType;
+ private MediaType mediaType;
+ private Object data;
+
+ @Override
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ @Override
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public Builder reconnectDelay(long milliseconds) {
+ this.reconnectDelay = milliseconds;
+ return this;
+ }
+
+ @Override
+ public Builder mediaType(MediaType mediaType) {
+ this.mediaType = mediaType;
+ return this;
+ }
+
+ @Override
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Builder data(Class type, Object data) {
+ this.type = type;
+ this.data= data;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Builder data(GenericType type, Object data) {
+ this.genericType = type.getType();
+ this.data= data;
+ return this;
+ }
+
+ @Override
+ public Builder data(Object data) {
+ this.data = data;
+ return this;
+ }
+
+ @Override
+ public OutboundSseEvent build() {
+ return new OutboundSseEventImpl(
+ id,
+ name,
+ comment,
+ reconnectDelay,
+ type,
+ genericType,
+ mediaType,
+ data
+ );
+ }
+
+ }
+
+ OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay,
+ Class<?> type, Type genericType, MediaType mediaType, Object data) {
+ this.id = id;
+ this.name = name;
+ this.comment = comment;
+ this.reconnectDelay = reconnectDelay;
+ this.type = type;
+ this.genericType = genericType;
+ this.mediaType = mediaType;
+ this.data = data;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public long getReconnectDelay() {
+ return reconnectDelay;
+ }
+
+ @Override
+ public boolean isReconnectDelaySet() {
+ return reconnectDelay != -1;
+ }
+
+ @Override
+ public Class<?> getType() {
+ return type;
+ }
+
+ @Override
+ public Type getGenericType() {
+ return genericType;
+ }
+
+ @Override
+ public MediaType getMediaType() {
+ return mediaType;
+ }
+
+ @Override
+ public Object getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
new file mode 100644
index 0000000..977a6b2
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventOutput;
+
+public class SseBroadcasterImpl implements SseBroadcaster {
+ private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>();
+ private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
+
+ @Override
+ public boolean register(Listener listener) {
+ return listeners.add(listener);
+ }
+
+ @Override
+ public boolean register(SseEventOutput output) {
+ return outputs.add(output);
+ }
+
+ @Override
+ public void broadcast(OutboundSseEvent event) {
+ for (final SseEventOutput output: outputs) {
+ try {
+ output.write(event);
+ } catch (final IOException ex) {
+ listeners.forEach(listener -> listener.onException(output, ex));
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ for (final SseEventOutput output: outputs) {
+ try {
+ output.close();
+ listeners.forEach(listener -> listener.onClose(output));
+ } catch (final IOException ex) {
+ listeners.forEach(listener -> listener.onException(output, ex));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
new file mode 100644
index 0000000..7f7963f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.SseEventOutput;
+
+@Provider
+public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> {
+ @Override
+ public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
+ return SseEventOutput.class.isAssignableFrom(cls);
+ }
+
+ @Override
+ public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType,
+ final Annotation[] annotations, final MediaType mediaType) {
+ return -1;
+ }
+
+ @Override
+ public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType,
+ final Annotation[] annotations, final MediaType mediaType,
+ final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
+ throws IOException, WebApplicationException {
+ // do nothing.
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
new file mode 100644
index 0000000..da682a0
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.feature.AbstractFeature;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider;
+
+public class SseFeature extends AbstractFeature {
+ @Override
+ public void initialize(Server server, Bus bus) {
+ final List<Object> providers = new ArrayList<>();
+
+ providers.add(new SseAtmosphereContextProvider());
+ providers.add(new SseEventOutputProvider());
+
+ ((ServerProviderFactory) server.getEndpoint().get(
+ ServerProviderFactory.class.getName())).setUserProviders(providers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
new file mode 100644
index 0000000..de2c3a9
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.SseContext;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+
+@Provider
+public class SseAtmosphereContextProvider implements ContextProvider<SseContext> {
+ @Override
+ public SseContext createContext(Message message) {
+ final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+ if (request == null) {
+ throw new IllegalStateException("Unable to retrieve HTTP request from the context");
+ }
+
+ final AtmosphereResource resource = (AtmosphereResource)request
+ .getAttribute(AtmosphereResource.class.getName());
+ if (resource == null) {
+ throw new IllegalStateException("AtmosphereResource is not present, "
+ + "is AtmosphereServlet configured properly?");
+ }
+
+ final Broadcaster broadcaster = resource.getAtmosphereConfig()
+ .getBroadcasterFactory()
+ .lookup(resource.uuid(), true);
+
+ resource.removeFromAllBroadcasters();
+ resource.setBroadcaster(broadcaster);
+
+ return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
new file mode 100644
index 0000000..dbf15ad
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventOutput;
+
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SseAtmosphereEventOutputImpl implements SseEventOutput {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SseAtmosphereEventOutputImpl.class);
+
+ private final AtmosphereResource resource;
+ private final MessageBodyWriter<OutboundSseEvent> writer;
+ private volatile boolean closed = false;
+
+ public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+ final AtmosphereResource resource) {
+ this.writer = writer;
+ this.resource = resource;
+
+ if (!resource.isSuspended()) {
+ resource.suspend();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+
+ if (resource.isSuspended()) {
+ resource.resume();
+ }
+
+ final Broadcaster broadcaster = resource.getBroadcaster();
+ resource.removeFromAllBroadcasters();
+
+ try {
+ if (!resource.getResponse().isCommitted()) {
+ resource.getResponse().flushBuffer();
+ }
+ } finally {
+ resource.close();
+ broadcaster.destroy();
+ }
+ }
+ }
+
+ @Override
+ public void write(OutboundSseEvent event) throws IOException {
+ if (!closed && writer != null) {
+ try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+ writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
+
+ // Atmosphere broadcasts asynchronously which is acceptable in most cases.
+ // Unfortunately, calling close() may lead to response stream being closed
+ // while there are still some SSE delivery scheduled.
+ final Future<Object> future = resource
+ .getBroadcaster()
+ .broadcast(os.toString(StandardCharsets.UTF_8.name()));
+
+ try {
+ if (!future.isDone()) {
+ // Let us wait at least 200 milliseconds before returning to ensure
+ // that SSE had the opportunity to be delivered.
+ future.get(200, TimeUnit.MILLISECONDS);
+ }
+ } catch (final ExecutionException | InterruptedException ex) {
+ throw new IOException(ex);
+ } catch (final TimeoutException ex) {
+ LOGGER.warn("SSE was not delivered within default timeout");
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
new file mode 100644
index 0000000..3b91c83
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+
+import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
+import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnPreSuspend;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.interceptor.AllowInterceptor;
+import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
+import org.atmosphere.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter.SERVER_SENT_EVENTS;
+import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
+import static org.atmosphere.cpr.FrameworkConfig.CALLBACK_JAVASCRIPT_PROTOCOL;
+import static org.atmosphere.cpr.FrameworkConfig.CONTAINER_RESPONSE;
+
+/**
+ * Most of this class implementation is borrowed from SSEAtmosphereInterceptor. The original
+ * implementation does two things which do not fit well into SSE support:
+ * - closes the response stream (overridden by SseAtmosphereInterceptorWriter)
+ * - wraps the whatever object is being written to SSE payload (overridden using
+ * the complete SSE protocol)
+ */
+public class SseAtmosphereInterceptor extends SSEAtmosphereInterceptor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SseAtmosphereInterceptor.class);
+
+ private static final byte[] PADDING;
+ private static final String PADDING_TEXT;
+ private static final byte[] END = "\r\n\r\n".getBytes();
+
+ static {
+ StringBuffer whitespace = new StringBuffer();
+ for (int i = 0; i < 2000; i++) {
+ whitespace.append(" ");
+ }
+ whitespace.append("\n");
+ PADDING_TEXT = whitespace.toString();
+ PADDING = PADDING_TEXT.getBytes();
+ }
+
+ private boolean writePadding(AtmosphereResponse response) {
+ if (response.request() != null && response.request().getAttribute("paddingWritten") != null) {
+ return false;
+ }
+
+ response.setContentType(SERVER_SENT_EVENTS);
+ response.setCharacterEncoding("utf-8");
+ boolean isUsingStream = (Boolean) response.request().getAttribute(PROPERTY_USE_STREAM);
+ if (isUsingStream) {
+ try {
+ OutputStream stream = response.getResponse().getOutputStream();
+ try {
+ stream.write(PADDING);
+ stream.flush();
+ } catch (IOException ex) {
+ LOGGER.warn("SSE may not work", ex);
+ }
+ } catch (IOException e) {
+ LOGGER.trace("", e);
+ }
+ } else {
+ try {
+ PrintWriter w = response.getResponse().getWriter();
+ w.println(PADDING_TEXT);
+ w.flush();
+ } catch (IOException e) {
+ LOGGER.trace("", e);
+ }
+ }
+ response.resource().getRequest().setAttribute("paddingWritten", "true");
+ return true;
+ }
+
+ @Override
+ public Action inspect(final AtmosphereResource r) {
+ if (Utils.webSocketMessage(r)) {
+ return Action.CONTINUE;
+ }
+
+ final AtmosphereRequest request = r.getRequest();
+ final String accept = request.getHeader("Accept") == null ? "text/plain" : request.getHeader("Accept").trim();
+
+ if (r.transport().equals(AtmosphereResource.TRANSPORT.SSE) || SERVER_SENT_EVENTS.equalsIgnoreCase(accept)) {
+ final AtmosphereResponse response = r.getResponse();
+ if (response.getAsyncIOWriter() == null) {
+ response.asyncIOWriter(new SseAtmosphereInterceptorWriter());
+ }
+
+ r.addEventListener(new P(response));
+
+ AsyncIOWriter writer = response.getAsyncIOWriter();
+ if (AtmosphereInterceptorWriter.class.isAssignableFrom(writer.getClass())) {
+ AtmosphereInterceptorWriter.class.cast(writer).interceptor(new AsyncIOInterceptorAdapter() {
+ private boolean padding() {
+ if (!r.isSuspended()) {
+ return writePadding(response);
+ }
+ return false;
+ }
+
+ @Override
+ public void prePayload(AtmosphereResponse response, byte[] data, int offset, int length) {
+ padding();
+ }
+
+ @Override
+ public void postPayload(AtmosphereResponse response, byte[] data, int offset, int length) {
+ // The CALLBACK_JAVASCRIPT_PROTOCOL may be called by a framework running on top of Atmosphere
+ // In that case, we must pad/protocol indenendently of the state of the AtmosphereResource
+ if (r.isSuspended() || r.getRequest().getAttribute(CALLBACK_JAVASCRIPT_PROTOCOL) != null
+ || r.getRequest().getAttribute(CONTAINER_RESPONSE) != null) {
+ response.write(END, true);
+ }
+
+ /**
+ * When used with https://github.com/remy/polyfills/blob/master/EventSource.js , we
+ * resume after every message.
+ */
+ String ua = r.getRequest().getHeader("User-Agent");
+ if (ua != null && ua.contains("MSIE")) {
+ try {
+ response.flushBuffer();
+ } catch (IOException e) {
+ LOGGER.trace("", e);
+ }
+ r.resume();
+ }
+ }
+ });
+ } else {
+ LOGGER.warn("Unable to apply {}. Your AsyncIOWriter must implement {}",
+ getClass().getName(), AtmosphereInterceptorWriter.class.getName());
+ }
+ }
+
+ return Action.CONTINUE;
+ }
+
+ private final class P extends OnPreSuspend implements AllowInterceptor {
+
+ private final AtmosphereResponse response;
+
+ private P(AtmosphereResponse response) {
+ this.response = response;
+ }
+
+ @Override
+ public void onPreSuspend(AtmosphereResourceEvent event) {
+ writePadding(response);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
new file mode 100644
index 0000000..24ebfd9
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereInterceptorWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.IOException;
+
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereResponse;
+
+public class SseAtmosphereInterceptorWriter extends AtmosphereInterceptorWriter {
+ @Override
+ public void close(AtmosphereResponse response) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
new file mode 100644
index 0000000..c330d6c
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseContext;
+import javax.ws.rs.sse.SseEventOutput;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl;
+import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.atmosphere.cpr.AtmosphereResource;
+
+public class SseAtmosphereResourceContext implements SseContext {
+ private final AtmosphereResource resource;
+ private final ServerProviderFactory factory;
+
+ SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) {
+ this.factory = factory;
+ this.resource = resource;
+ }
+
+ @Override
+ public SseEventOutput newOutput() {
+ final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory,
+ JAXRSUtils.getCurrentMessage().getExchange());
+ return new SseAtmosphereEventOutputImpl(writer, resource);
+ }
+
+ @Override
+ public Builder newEvent() {
+ return new OutboundSseEventImpl.BuilderImpl();
+ }
+
+ @Override
+ public SseBroadcaster newBroadcaster() {
+ return new SseBroadcasterImpl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java
new file mode 100644
index 0000000..bc87ebf
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/servlet/CXFSseServlet.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.servlet;
+
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
+import org.apache.cxf.transport.servlet.CXFNonSpringServlet;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereServlet;
+import org.atmosphere.handler.ReflectorServletProcessor;
+
+public class CXFSseServlet extends AtmosphereServlet {
+ private static final long serialVersionUID = -874047746532165731L;
+
+ public CXFSseServlet(final CXFNonSpringServlet delegate) {
+ // Register and map the dispatcher servlet
+ super(true);
+
+ framework().addAtmosphereHandler("/*", new ReflectorServletProcessor(delegate));
+ framework().interceptor(new SseAtmosphereInterceptor());
+ framework().addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+ framework().addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+ framework().addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
+ framework().addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
new file mode 100644
index 0000000..af59327
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/SseHttpTransportFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.sse;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transport.sse.atmosphere.AtmosphereSseServletDestination;
+
+@NoJSR250Annotations
+public class SseHttpTransportFactory extends HTTPTransportFactory
+ implements ConduitInitiator, DestinationFactory {
+
+ public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/http/sse";
+ public static final List<String> DEFAULT_NAMESPACES = Arrays.asList(
+ TRANSPORT_ID,
+ "http://cxf.apache.org/transports/http/sse/configuration"
+ );
+
+ public SseHttpTransportFactory() {
+ this(null);
+ }
+
+ public SseHttpTransportFactory(DestinationRegistry registry) {
+ super(DEFAULT_NAMESPACES, registry);
+ }
+
+ @Override
+ public Destination getDestination(EndpointInfo endpointInfo, Bus bus) throws IOException {
+ return new AtmosphereSseServletDestination(bus, getRegistry(), endpointInfo, endpointInfo.getAddress());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
new file mode 100644
index 0000000..15e5c9f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.sse.atmosphere;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxrs.sse.SseFeature;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.servlet.ServletDestination;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereRequestImpl;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponseImpl;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
+
+public class AtmosphereSseServletDestination extends ServletDestination {
+ private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereSseServletDestination.class);
+
+ private AtmosphereFramework framework;
+
+ public AtmosphereSseServletDestination(Bus bus, DestinationRegistry registry,
+ EndpointInfo ei, String path) throws IOException {
+ super(bus, registry, ei, path);
+
+ framework = new AtmosphereFramework(true, false);
+ framework.interceptor(new SseAtmosphereInterceptor());
+ framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.DISABLE_ATMOSPHEREINTERCEPTOR, "true");
+ framework.addInitParameter(ApplicationConfig.CLOSE_STREAM_ON_CANCEL, "true");
+ framework.addAtmosphereHandler("/", new DestinationHandler());
+ framework.init();
+
+ bus.getFeatures().add(new SseFeature());
+ }
+
+ @Override
+ public void invoke(ServletConfig config, ServletContext context, HttpServletRequest req,
+ HttpServletResponse resp) throws IOException {
+ try {
+ framework.doCometSupport(AtmosphereRequestImpl.wrap(req), AtmosphereResponseImpl.wrap(resp));
+ } catch (ServletException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ framework.destroy();
+ } catch (Exception ex) {
+ LOG.warning("Graceful shutdown was not successful: " + ex.getMessage());
+ } finally {
+ super.shutdown();
+ }
+ }
+
+ private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+ @Override
+ public void onRequest(final AtmosphereResource resource) throws IOException {
+ LOG.fine("onRequest");
+ try {
+ AtmosphereSseServletDestination.super.invoke(null, resource.getRequest().getServletContext(),
+ resource.getRequest(), resource.getResponse());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to invoke service", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
new file mode 100644
index 0000000..643b51c
--- /dev/null
+++ b/rt/rs/sse/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -0,0 +1 @@
+org.apache.cxf.transport.sse.SseHttpTransportFactory::true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
index 706c8c1..e09d549 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPTransportFactory.java
@@ -87,13 +87,19 @@ public class HTTPTransportFactory
public HTTPTransportFactory() {
this(new DestinationRegistryImpl());
}
+
public HTTPTransportFactory(DestinationRegistry registry) {
- super(DEFAULT_NAMESPACES);
+ this(DEFAULT_NAMESPACES, registry);
+ }
+
+ protected HTTPTransportFactory(List<String> transportIds, DestinationRegistry registry) {
+ super(transportIds);
if (registry == null) {
registry = new DestinationRegistryImpl();
}
this.registry = registry;
}
+
public DestinationRegistry getRegistry() {
return registry;
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/690f26a4/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
----------------------------------------------------------------------
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
index fe46f4f..37d56b8 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/servlet/CXFNonSpringServlet.java
@@ -38,6 +38,7 @@ import org.apache.cxf.BusException;
import org.apache.cxf.BusFactory;
import org.apache.cxf.common.classloader.ClassLoaderUtils;
import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder;
+import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.resource.ResourceManager;
import org.apache.cxf.transport.DestinationFactory;
@@ -48,6 +49,8 @@ import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.transport.servlet.servicelist.ServiceListGeneratorServlet;
public class CXFNonSpringServlet extends AbstractHTTPServlet {
+ public static final String TRANSPORT_ID = "transportId";
+
private static final long serialVersionUID = -2437897227486327166L;
private static final String IGNORE_SERVLET_CONTEXT_RESOLVER = "ignore.servlet.context.resolver";
@@ -80,7 +83,7 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet {
loader = initClassLoader();
registerServletContextResolver(sc);
if (destinationRegistry == null) {
- this.destinationRegistry = getDestinationRegistryFromBus();
+ this.destinationRegistry = getDestinationRegistryFromBusOrDefault(sc.getInitParameter(TRANSPORT_ID));
}
}
@@ -101,11 +104,12 @@ public class CXFNonSpringServlet extends AbstractHTTPServlet {
return bus.getExtension(ClassLoader.class);
}
- protected DestinationRegistry getDestinationRegistryFromBus() {
+ protected DestinationRegistry getDestinationRegistryFromBusOrDefault(final String transportId) {
DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
try {
- DestinationFactory df = dfm
- .getDestinationFactory("http://cxf.apache.org/transports/http/configuration");
+ DestinationFactory df = StringUtils.isEmpty(transportId)
+ ? dfm.getDestinationFactory("http://cxf.apache.org/transports/http/configuration")
+ : dfm.getDestinationFactory(transportId);
if (df instanceof HTTPTransportFactory) {
HTTPTransportFactory transportFactory = (HTTPTransportFactory)df;
return transportFactory.getRegistry();