You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/02/13 01:40:10 UTC
cxf git commit: Upgrading to JAX-RS 2.1-m04,
updating SSE server-side implementation to accomodate API changes
Repository: cxf
Updated Branches:
refs/heads/master e1d950605 -> 90a74fc75
Upgrading to JAX-RS 2.1-m04, updating SSE server-side implementation to accomodate API changes
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/90a74fc7
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/90a74fc7
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/90a74fc7
Branch: refs/heads/master
Commit: 90a74fc75af41898ee50359c13f298eb483a3d01
Parents: e1d9506
Author: reta <dr...@gmail.com>
Authored: Sun Feb 12 20:39:39 2017 -0500
Committer: reta <dr...@gmail.com>
Committed: Sun Feb 12 20:39:39 2017 -0500
----------------------------------------------------------------------
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 51 +++----
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 50 +++----
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 51 +++----
parent/pom.xml | 4 +-
.../cxf/jaxrs/sse/SseBroadcasterImpl.java | 70 ++++++---
.../cxf/jaxrs/sse/SseEventOutputProvider.java | 53 -------
.../org/apache/cxf/jaxrs/sse/SseFactory.java | 27 ++++
.../org/apache/cxf/jaxrs/sse/SseFeature.java | 5 +-
.../java/org/apache/cxf/jaxrs/sse/SseImpl.java | 38 +++++
.../SseAtmosphereContextProvider.java | 57 --------
.../SseAtmosphereEventOutputImpl.java | 121 ----------------
.../SseAtmosphereEventSinkContextProvider.java | 61 ++++++++
.../atmosphere/SseAtmosphereEventSinkImpl.java | 144 +++++++++++++++++++
.../SseAtmosphereResourceContext.java | 60 --------
.../AtmosphereSseServletDestination.java | 36 +++++
.../jaxrs/sse/AbstractBroadcasterSseTest.java | 2 +-
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 69 +++++----
systests/rs-sse/src/test/resources/logback.xml | 4 +-
18 files changed, 471 insertions(+), 432 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index b8f8608..a156125 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,52 +29,53 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.sse.SseFactory;
@Path("/stats")
public class StatsRestServiceImpl {
private static final Random RANDOM = new Random();
+ private final Sse sse = SseFactory.create();
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id("" + eventId)
- .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
- }
-
@GET
@Path("sse/{id}")
- @Produces("text/event-stream")
- public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
- final SseEventOutput output = sseContext.newOutput();
-
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
new Thread() {
public void run() {
try {
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+ final Builder builder = sse.newEventBuilder();
+ sink.onNext(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+ sink.onNext(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+ sink.onNext(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+ sink.onNext(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+ sink.onNext(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+ sink.onNext(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+ sink.onNext(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
- output.close();
+ sink.onNext(createStatsEvent(builder.name("stats"), 8));
+ sink.close();
} catch (final InterruptedException | IOException e) {
e.printStackTrace();
}
}
}.start();
-
- return output;
+ }
+
+ private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id("" + eventId)
+ .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+ .mediaType(MediaType.APPLICATION_JSON_TYPE)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 7112228..005c2bf 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,55 +29,55 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import org.apache.cxf.jaxrs.sse.SseFactory;
import org.springframework.stereotype.Component;
@Path("/stats")
@Component
public class StatsRestServiceImpl {
private static final Random RANDOM = new Random();
-
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id("" + eventId)
- .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
- }
+ private final Sse sse = SseFactory.create();
@GET
@Path("sse/{id}")
- @Produces("text/event-stream")
- public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
- final SseEventOutput output = sseContext.newOutput();
-
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
new Thread() {
public void run() {
try {
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+ final Builder builder = sse.newEventBuilder();
+ sink.onNext(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+ sink.onNext(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+ sink.onNext(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+ sink.onNext(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+ sink.onNext(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+ sink.onNext(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+ sink.onNext(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
- output.close();
+ sink.onNext(createStatsEvent(builder.name("stats"), 8));
+ sink.close();
} catch (final InterruptedException | IOException e) {
e.printStackTrace();
}
}
}.start();
-
- return output;
+ }
+
+ private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id("" + eventId)
+ .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+ .mediaType(MediaType.APPLICATION_JSON_TYPE)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index b8f8608..f937702 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -29,52 +29,53 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.sse.SseFactory;
@Path("/stats")
public class StatsRestServiceImpl {
private static final Random RANDOM = new Random();
-
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id("" + eventId)
- .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
- }
+ private final Sse sse = SseFactory.create();
@GET
@Path("sse/{id}")
- @Produces("text/event-stream")
- public SseEventOutput stats(@Context SseContext sseContext, @PathParam("id") final String id) {
- final SseEventOutput output = sseContext.newOutput();
-
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
new Thread() {
public void run() {
try {
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 1));
+ final Builder builder = sse.newEventBuilder();
+ sink.onNext(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 2));
+ sink.onNext(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 3));
+ sink.onNext(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 4));
+ sink.onNext(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 5));
+ sink.onNext(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 6));
+ sink.onNext(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 7));
+ sink.onNext(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(1000);
- output.write(createStatsEvent(sseContext.newEvent().name("stats"), 8));
- output.close();
+ sink.onNext(createStatsEvent(builder.name("stats"), 8));
+ sink.close();
} catch (final InterruptedException | IOException e) {
e.printStackTrace();
}
}
}.start();
-
- return output;
+ }
+
+ private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id("" + eventId)
+ .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
+ .mediaType(MediaType.APPLICATION_JSON_TYPE)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index fadbd8f..0885fc1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -70,7 +70,7 @@
<cxf.activemq.version>5.14.2</cxf.activemq.version>
<cxf.ahc.version>1.9.8</cxf.ahc.version>
<cxf.apacheds.version>2.0.0-M23</cxf.apacheds.version>
- <cxf.atmosphere.version>2.4.7</cxf.atmosphere.version>
+ <cxf.atmosphere.version>2.4.9</cxf.atmosphere.version>
<cxf.atmosphere.version.range>[2.4,3.0)</cxf.atmosphere.version.range>
<cxf.axiom.version>1.2.14</cxf.axiom.version>
<cxf.bcprov.version>1.55</cxf.bcprov.version>
@@ -109,7 +109,7 @@
<cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
<cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
<cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
- <cxf.javax.ws.rs.version>2.1-m03</cxf.javax.ws.rs.version>
+ <cxf.javax.ws.rs.version>2.1-m04</cxf.javax.ws.rs.version>
<cxf.jaxb.version>2.2.11</cxf.jaxb.version>
<cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
<cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 977a6b2..075ae56 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,48 +18,74 @@
*/
package org.apache.cxf.jaxrs.sse;
-import java.io.IOException;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import javax.ws.rs.Flow;
+import javax.ws.rs.Flow.Subscriber;
+import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseEventOutput;
public class SseBroadcasterImpl implements SseBroadcaster {
- private final Set<SseEventOutput> outputs = new CopyOnWriteArraySet<>();
- private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
+ private final Map<Flow.Subscriber<? super OutboundSseEvent>, Subscription> subscribers =
+ new ConcurrentHashMap<>();
+
+ private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers =
+ new CopyOnWriteArraySet<>();
+
+ private final Set<BiConsumer<Subscriber<? super OutboundSseEvent>, Exception>> exceptioners =
+ new CopyOnWriteArraySet<>();
@Override
- public boolean register(Listener listener) {
- return listeners.add(listener);
- }
+ public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
+ final Subscription subscription = new Subscription() {
+ public void request(long n) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+ };
- @Override
- public boolean register(SseEventOutput output) {
- return outputs.add(output);
+ try {
+ subscriber.onSubscribe(subscription);
+ subscribers.put(subscriber, subscription);
+ } catch (final Exception ex) {
+ subscriber.onError(ex);
+ }
}
@Override
public void broadcast(OutboundSseEvent event) {
- for (final SseEventOutput output: outputs) {
+ for (final Flow.Subscriber<? super OutboundSseEvent> subscriber: subscribers.keySet()) {
try {
- output.write(event);
- } catch (final IOException ex) {
- listeners.forEach(listener -> listener.onException(output, ex));
+ subscriber.onNext(event);
+ } catch (final Exception ex) {
+ exceptioners.forEach(exceptioner -> exceptioner.accept(subscriber, ex));
}
}
}
+
+ @Override
+ public void onClose(Consumer<Subscriber<? super OutboundSseEvent>> subscriber) {
+ closers.add(subscriber);
+ }
+
+ @Override
+ public void onException(BiConsumer<Subscriber<? super OutboundSseEvent>, Exception> exceptioner) {
+ exceptioners.add(exceptioner);
+ }
@Override
public void close() {
- for (final SseEventOutput output: outputs) {
- try {
- output.close();
- listeners.forEach(listener -> listener.onClose(output));
- } catch (final IOException ex) {
- listeners.forEach(listener -> listener.onException(output, ex));
- }
- }
+ subscribers.keySet().forEach(subscriber -> {
+ subscriber.onComplete();
+ closers.forEach(closer -> closer.accept(subscriber));
+ });
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
deleted file mode 100644
index 7f7963f..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseEventOutputProvider.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.sse.SseEventOutput;
-
-@Provider
-public class SseEventOutputProvider implements MessageBodyWriter<SseEventOutput> {
- @Override
- public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
- return SseEventOutput.class.isAssignableFrom(cls);
- }
-
- @Override
- public long getSize(final SseEventOutput output, final Class<?> type, final Type genericType,
- final Annotation[] annotations, final MediaType mediaType) {
- return -1;
- }
-
- @Override
- public void writeTo(final SseEventOutput output, final Class<?> type, final Type genericType,
- final Annotation[] annotations, final MediaType mediaType,
- final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
- throws IOException, WebApplicationException {
- // do nothing.
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
new file mode 100644
index 0000000..36a0e8e
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import javax.ws.rs.sse.Sse;
+
+public interface SseFactory {
+ static Sse create() {
+ return new SseImpl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
index da682a0..2a381ea 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseFeature.java
@@ -25,15 +25,14 @@ import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereContextProvider;
+import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereEventSinkContextProvider;
public class SseFeature extends AbstractFeature {
@Override
public void initialize(Server server, Bus bus) {
final List<Object> providers = new ArrayList<>();
- providers.add(new SseAtmosphereContextProvider());
- providers.add(new SseEventOutputProvider());
+ providers.add(new SseAtmosphereEventSinkContextProvider());
((ServerProviderFactory) server.getEndpoint().get(
ServerProviderFactory.class.getName())).setUserProviders(providers);
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
new file mode 100644
index 0000000..9327c3f
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseImpl.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.Sse;
+
+class SseImpl implements Sse {
+ SseImpl() {
+ }
+
+ @Override
+ public Builder newEventBuilder() {
+ return new OutboundSseEventImpl.BuilderImpl();
+ }
+
+ @Override
+ public SseBroadcaster newBroadcaster() {
+ return new SseBroadcasterImpl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
deleted file mode 100644
index de2c3a9..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereContextProvider.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.sse.SseContext;
-
-import org.apache.cxf.jaxrs.ext.ContextProvider;
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.Broadcaster;
-
-@Provider
-public class SseAtmosphereContextProvider implements ContextProvider<SseContext> {
- @Override
- public SseContext createContext(Message message) {
- final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
- if (request == null) {
- throw new IllegalStateException("Unable to retrieve HTTP request from the context");
- }
-
- final AtmosphereResource resource = (AtmosphereResource)request
- .getAttribute(AtmosphereResource.class.getName());
- if (resource == null) {
- throw new IllegalStateException("AtmosphereResource is not present, "
- + "is AtmosphereServlet configured properly?");
- }
-
- final Broadcaster broadcaster = resource.getAtmosphereConfig()
- .getBroadcasterFactory()
- .lookup(resource.uuid(), true);
-
- resource.removeFromAllBroadcasters();
- resource.setBroadcaster(broadcaster);
-
- return new SseAtmosphereResourceContext(ServerProviderFactory.getInstance(message), resource);
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
deleted file mode 100644
index 439c96d..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventOutputImpl.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventOutput;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResponse;
-import org.atmosphere.cpr.Broadcaster;
-
-public class SseAtmosphereEventOutputImpl implements SseEventOutput {
- private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventOutputImpl.class);
-
- private final AtmosphereResource resource;
- private final MessageBodyWriter<OutboundSseEvent> writer;
- private volatile boolean closed;
-
- public SseAtmosphereEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer,
- final AtmosphereResource resource) {
- this.writer = writer;
- this.resource = resource;
-
- if (!resource.isSuspended()) {
- LOG.fine("Atmosphere resource is not suspended, suspending");
- resource.suspend();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- closed = true;
-
- LOG.fine("Closing Atmosphere SSE event output");
- if (resource.isSuspended()) {
- LOG.fine("Atmosphere resource is suspended, resuming");
- resource.resume();
- }
-
- final Broadcaster broadcaster = resource.getBroadcaster();
- resource.removeFromAllBroadcasters();
-
- try {
- final AtmosphereResponse response = resource.getResponse();
- if (!response.isCommitted()) {
- LOG.fine("Response is not committed, flushing buffer");
- response.flushBuffer();
- }
-
- response.closeStreamOrWriter();
- } finally {
- resource.close();
- broadcaster.destroy();
- LOG.fine("Atmosphere SSE event output is closed");
- }
- }
- }
-
- @Override
- public void write(OutboundSseEvent event) throws IOException {
- if (!closed && writer != null) {
- try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
- writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
-
- // Atmosphere broadcasts asynchronously which is acceptable in most cases.
- // Unfortunately, calling close() may lead to response stream being closed
- // while there are still some SSE delivery scheduled.
- final Future<Object> future = resource
- .getBroadcaster()
- .broadcast(os.toString(StandardCharsets.UTF_8.name()));
-
- try {
- if (!future.isDone()) {
- // Let us wait at least 200 milliseconds before returning to ensure
- // that SSE had the opportunity to be delivered.
- LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
- future.get(200, TimeUnit.MILLISECONDS);
- }
- } catch (final ExecutionException | InterruptedException ex) {
- throw new IOException(ex);
- } catch (final TimeoutException ex) {
- LOG.warning("SSE Atmosphere response was not delivered within default timeout");
- }
- }
- }
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
new file mode 100644
index 0000000..bbbd754
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkContextProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.Broadcaster;
+
+public class SseAtmosphereEventSinkContextProvider implements ContextProvider<SseEventSink> {
+ @Override
+ public SseEventSink createContext(Message message) {
+ final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+ if (request == null) {
+ throw new IllegalStateException("Unable to retrieve HTTP request from the context");
+ }
+
+ final AtmosphereResource resource = (AtmosphereResource)request
+ .getAttribute(AtmosphereResource.class.getName());
+ if (resource == null) {
+ throw new IllegalStateException("AtmosphereResource is not present, "
+ + "is AtmosphereServlet configured properly?");
+ }
+
+ final Broadcaster broadcaster = resource.getAtmosphereConfig()
+ .getBroadcasterFactory()
+ .lookup(resource.uuid(), true);
+
+ resource.removeFromAllBroadcasters();
+ resource.setBroadcaster(broadcaster);
+
+ final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
+ ServerProviderFactory.getInstance(message), message.getExchange());
+
+ return new SseAtmosphereEventSinkImpl(writer, resource);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
new file mode 100644
index 0000000..c930589
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse.atmosphere;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.ws.rs.Flow.Subscription;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.cpr.Broadcaster;
+
+public class SseAtmosphereEventSinkImpl implements SseEventSink {
+ private static final Logger LOG = LogUtils.getL7dLogger(SseAtmosphereEventSinkImpl.class);
+
+ private final AtmosphereResource resource;
+ private final MessageBodyWriter<OutboundSseEvent> writer;
+
+ private volatile boolean closed;
+
+ public SseAtmosphereEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+ final AtmosphereResource resource) {
+ this.writer = writer;
+ this.resource = resource;
+
+ if (!resource.isSuspended()) {
+ LOG.fine("Atmosphere resource is not suspended, suspending");
+ resource.suspend();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+
+ LOG.fine("Closing Atmosphere SSE event output");
+ if (resource.isSuspended()) {
+ LOG.fine("Atmosphere resource is suspended, resuming");
+ resource.resume();
+ }
+
+ final Broadcaster broadcaster = resource.getBroadcaster();
+ resource.removeFromAllBroadcasters();
+
+ try {
+ final AtmosphereResponse response = resource.getResponse();
+ if (!response.isCommitted()) {
+ LOG.fine("Response is not committed, flushing buffer");
+ response.flushBuffer();
+ }
+
+ response.closeStreamOrWriter();
+ } finally {
+ resource.close();
+ broadcaster.destroy();
+ LOG.fine("Atmosphere SSE event output is closed");
+ }
+ }
+ }
+
+ @Override
+ public void onNext(OutboundSseEvent event) {
+ if (!closed && writer != null) {
+ try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+ writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
+
+ // Atmosphere broadcasts asynchronously which is acceptable in most cases.
+ // Unfortunately, calling close() may lead to response stream being closed
+ // while there are still some SSE delivery scheduled.
+ final Future<Object> future = resource
+ .getBroadcaster()
+ .broadcast(os.toString(StandardCharsets.UTF_8.name()));
+
+ try {
+ if (!future.isDone()) {
+ // Let us wait at least 200 milliseconds before returning to ensure
+ // that SSE had the opportunity to be delivered.
+ LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
+ future.get(200, TimeUnit.MILLISECONDS);
+ }
+ } catch (final ExecutionException | InterruptedException ex) {
+ throw new IOException(ex);
+ } catch (final TimeoutException ex) {
+ LOG.warning("SSE Atmosphere response was not delivered within default timeout");
+ }
+ } catch(final IOException ex) {
+ LOG.warning("While writing the SSE event, an exception was raised: " + ex);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // TODO: Should we close the response?
+ }
+
+ @Override
+ public void onComplete() {
+ try {
+ close();
+ } catch (final IOException ex) {
+ LOG.warning("While closing the SSE connection, an exception was raised: " + ex);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
deleted file mode 100644
index c330d6c..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereResourceContext.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse.atmosphere;
-
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.OutboundSseEvent.Builder;
-import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
-
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventBodyWriter;
-import org.apache.cxf.jaxrs.sse.OutboundSseEventImpl;
-import org.apache.cxf.jaxrs.sse.SseBroadcasterImpl;
-import org.apache.cxf.jaxrs.utils.JAXRSUtils;
-import org.atmosphere.cpr.AtmosphereResource;
-
-public class SseAtmosphereResourceContext implements SseContext {
- private final AtmosphereResource resource;
- private final ServerProviderFactory factory;
-
- SseAtmosphereResourceContext(final ServerProviderFactory factory, final AtmosphereResource resource) {
- this.factory = factory;
- this.resource = resource;
- }
-
- @Override
- public SseEventOutput newOutput() {
- final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(factory,
- JAXRSUtils.getCurrentMessage().getExchange());
- return new SseAtmosphereEventOutputImpl(writer, resource);
- }
-
- @Override
- public Builder newEvent() {
- return new OutboundSseEventImpl.BuilderImpl();
- }
-
- @Override
- public SseBroadcaster newBroadcaster() {
- return new SseBroadcasterImpl();
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
index 37cfc5e..673051a 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/transport/sse/atmosphere/AtmosphereSseServletDestination.java
@@ -20,6 +20,10 @@
package org.apache.cxf.transport.sse.atmosphere;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,13 +32,17 @@ import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.jaxrs.sse.SseFeature;
import org.apache.cxf.jaxrs.sse.atmosphere.SseAtmosphereInterceptor;
+import org.apache.cxf.message.Message;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.Headers;
import org.apache.cxf.transport.servlet.ServletDestination;
import org.atmosphere.cache.UUIDBroadcasterCache;
import org.atmosphere.cpr.ApplicationConfig;
@@ -99,4 +107,32 @@ public class AtmosphereSseServletDestination extends ServletDestination {
}
}
}
+
+ @Override
+ protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
+ adjustContentLength(outMessage);
+ return super.flushHeaders(outMessage, getStream);
+ }
+
+ @Override
+ protected OutputStream flushHeaders(Message outMessage) throws IOException {
+ adjustContentLength(outMessage);
+ return super.flushHeaders(outMessage);
+ }
+
+ /**
+ * It has been noticed that Jetty checks the "Content-Length" header and completes the
+ * response if its value is 0 (or matches the number of bytes written). However, in case
+ * of SSE the content length is unknown so we are setting it to -1 before flushing the
+ * response. Otherwise, only the first event is going to be sent and response is going to
+ * be closed.
+ */
+ private void adjustContentLength(Message outMessage) {
+ final String contentType = (String)outMessage.get(Message.CONTENT_TYPE);
+
+ if (MediaType.SERVER_SENT_EVENTS.equalsIgnoreCase(contentType)) {
+ final Map<String, List<String>> headers = Headers.getSetProtocolHeaders(outMessage);
+ headers.put(HttpHeaders.CONTENT_LENGTH, Collections.singletonList("-1"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
index e4fb617..3e2eb96 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractBroadcasterSseTest.java
@@ -49,7 +49,7 @@ public abstract class AbstractBroadcasterSseTest extends AbstractSseBaseTest {
createWebClient("/rest/api/bookstore/broadcast/close")
.async()
.post(null)
- .get(5, TimeUnit.SECONDS)
+ .get(10, TimeUnit.SECONDS)
.close();
for (final Future<Response> result: results) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index 1d5baff..6aa815d 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -36,10 +35,12 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseContext;
-import javax.ws.rs.sse.SseEventOutput;
+import javax.ws.rs.sse.SseEventSink;
+import org.apache.cxf.jaxrs.sse.SseFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,16 +48,12 @@ import org.slf4j.LoggerFactory;
public class BookStore {
private static final Logger LOG = LoggerFactory.getLogger(BookStore.class);
+ private final Sse sse = SseFactory.create();
private final CountDownLatch latch = new CountDownLatch(2);
- private final AtomicReference<SseBroadcaster> broadcaster =
- new AtomicReference<SseBroadcaster>();
+ private final SseBroadcaster broadcaster;
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
- return builder
- .id(Integer.toString(eventId))
- .data(Book.class, new Book("New Book #" + eventId, eventId))
- .mediaType(MediaType.APPLICATION_JSON_TYPE)
- .build();
+ public BookStore() {
+ broadcaster = sse.newBroadcaster();
}
@GET
@@ -71,50 +68,37 @@ public class BookStore {
@GET
@Path("sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
- public SseEventOutput forBook(@Context SseContext sseContext, @PathParam("id") final String id,
+ public void forBook(@Context SseEventSink sink, @PathParam("id") final String id,
@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("0") final String lastEventId) {
- final SseEventOutput output = sseContext.newOutput();
new Thread() {
public void run() {
try {
final Integer id = Integer.valueOf(lastEventId);
+ final Builder builder = sse.newEventBuilder();
- output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 1));
+ sink.onNext(createStatsEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 2));
+ sink.onNext(createStatsEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 3));
+ sink.onNext(createStatsEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- output.write(createStatsEvent(sseContext.newEvent().name("book"), id + 4));
+ sink.onNext(createStatsEvent(builder.name("book"), id + 4));
Thread.sleep(200);
- output.close();
+ sink.close();
} catch (final InterruptedException | IOException ex) {
LOG.error("Communication error", ex);
}
}
}.start();
-
- return output;
}
@GET
@Path("broadcast/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
- public SseEventOutput broadcast(@Context SseContext sseContext) {
- final SseEventOutput output = sseContext.newOutput();
-
- if (broadcaster.get() == null) {
- broadcaster.compareAndSet(null, sseContext.newBroadcaster());
- }
-
+ public void broadcast(@Context SseEventSink sink) {
try {
- broadcaster.get().register(output);
-
- broadcaster.get().broadcast(createStatsEvent(sseContext.newEvent().name("book"), 1000));
- broadcaster.get().broadcast(createStatsEvent(sseContext.newEvent().name("book"), 2000));
-
- return output;
+ broadcaster.subscribe(sink);
} finally {
latch.countDown();
}
@@ -125,15 +109,28 @@ public class BookStore {
public void stop() {
try {
// Await a least 2 clients to be broadcasted over
- if (!latch.await(4, TimeUnit.SECONDS)) {
+ if (!latch.await(10, TimeUnit.SECONDS)) {
LOG.warn("Not enough clients have been connected, closing broadcaster anyway");
}
+
+ final Builder builder = sse.newEventBuilder();
+ broadcaster.broadcast(createStatsEvent(builder.name("book"), 1000));
+ broadcaster.broadcast(createStatsEvent(builder.name("book"), 2000));
+
} catch (final InterruptedException ex) {
LOG.error("Wait has been interrupted", ex);
}
- if (broadcaster.get() != null) {
- broadcaster.get().close();
+ if (broadcaster != null) {
+ broadcaster.close();
}
}
+
+ private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ return builder
+ .id(Integer.toString(eventId))
+ .data(Book.class, new Book("New Book #" + eventId, eventId))
+ .mediaType(MediaType.APPLICATION_JSON_TYPE)
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/90a74fc7/systests/rs-sse/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/resources/logback.xml b/systests/rs-sse/src/test/resources/logback.xml
index 73cf1ea..430aa64 100644
--- a/systests/rs-sse/src/test/resources/logback.xml
+++ b/systests/rs-sse/src/test/resources/logback.xml
@@ -6,7 +6,7 @@
</encoder>
</appender>
- <!--<root level="DEBUG">
+ <root level="INFO">
<appender-ref ref="STDOUT" />
- </root>-->
+ </root>
</configuration>