You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/03/22 01:04:17 UTC
cxf git commit: SSE implementation updates: added
SseUnboundedSubscription, updated SSE/CDI example to use SseBroadcaster
Repository: cxf
Updated Branches:
refs/heads/master 3833280c6 -> a4b98454d
SSE implementation updates: added SseUnboundedSubscription, updated SSE/CDI example to use SseBroadcaster
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a4b98454
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a4b98454
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a4b98454
Branch: refs/heads/master
Commit: a4b98454d147437ff71b92355d60e8bd48d56448
Parents: 3833280
Author: reta <dr...@gmail.com>
Authored: Tue Mar 21 21:03:50 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Tue Mar 21 21:03:50 2017 -0400
----------------------------------------------------------------------
.../release/samples/jax_rs/sse_cdi/README.txt | 6 +-
.../main/release/samples/jax_rs/sse_cdi/pom.xml | 6 ++
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 56 +++++++--------
.../src/main/resources/web-ui/index.html | 2 +-
.../cxf/jaxrs/sse/SseBroadcasterImpl.java | 26 +++----
.../cxf/jaxrs/sse/SseUnboundedSubscription.java | 72 ++++++++++++++++++++
.../atmosphere/SseAtmosphereEventSinkImpl.java | 2 +-
7 files changed, 117 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt b/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
index 6917376..7255d9a 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
@@ -3,10 +3,8 @@ JAX-RS SSE Demo
This is a SSE version of JAX-RS Basic Demo using CDI.
-A SSE endpoint service is provided on URL http://localhost:8686/rest/api/stats/sse/{id}
-where {id} is any integer value, f.e.:
-
- http://localhost:8686/rest/api/stats/sse/1
+A SSE endpoint service is provided on URL http://localhost:8686/rest/api/stats/sse which
+is broadcasting the SSE events to all subscribers (using RxJava 2 and SseBroadcaster).
This sample includes a simple web UI using Highcharts JavaScript library to show off
randomly generated statistics about particular server, pushed to the client using
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
index f2d1c8b..aec20ed 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
@@ -94,6 +94,12 @@
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.0.7</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 6f48923..0aa943c 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -20,60 +20,54 @@ package demo.jaxrs.sse;
import java.util.Date;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.OutboundSseEvent.Builder;
import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
+import io.reactivex.Emitter;
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
@Path("/stats")
public class StatsRestServiceImpl {
private static final Random RANDOM = new Random();
- private Sse sse;
+
+ private SseBroadcaster broadcaster;
+ private Builder builder;
+
@Context
public void setSse(Sse sse) {
- this.sse = sse;
+ this.broadcaster = sse.newBroadcaster();
+ this.builder = sse.newEventBuilder();
+
+ Flowable
+ .interval(500, TimeUnit.MILLISECONDS)
+ .zipWith(
+ Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
+ (id, bldr) -> createStatsEvent(bldr, id)
+ )
+ .subscribeOn(Schedulers.single())
+ .subscribe(broadcaster::broadcast);
}
@GET
- @Path("sse/{id}")
+ @Path("sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
- public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
- new Thread() {
- public void run() {
- try {
- final Builder builder = sse.newEventBuilder();
- sink.onNext(createStatsEvent(builder.name("stats"), 1));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 2));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 3));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 4));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 5));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 6));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 7));
- Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 8));
- sink.close();
- } catch (final Exception e) {
- e.printStackTrace();
- }
- }
- }.start();
+ public void stats(@Context SseEventSink sink) {
+ broadcaster.subscribe(sink);
}
- private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+ private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {
return builder
.id("" + eventId)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
index 2aaed6e..d79f804 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
@@ -31,7 +31,7 @@
});
if( !!window.EventSource ) {
- var event = new EventSource("http://localhost:8686/rest/api/stats/sse/1");
+ var event = new EventSource("http://localhost:8686/rest/api/stats/sse");
event.addEventListener('message', function( event ) {
var datapoint = jQuery.parseJSON( event.data );
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index ceeaee2..06fb8e3 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -27,12 +27,11 @@ import java.util.function.Consumer;
import javax.ws.rs.Flow;
import javax.ws.rs.Flow.Subscriber;
-import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
public class SseBroadcasterImpl implements SseBroadcaster {
- private final Map<Flow.Subscriber<? super OutboundSseEvent>, Subscription> subscribers =
+ private final Map<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> subscribers =
new ConcurrentHashMap<>();
private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers =
@@ -43,18 +42,13 @@ public class SseBroadcasterImpl implements SseBroadcaster {
@Override
public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
- final Subscription subscription = new Subscription() {
- public void request(long n) {
- }
-
- @Override
- public void cancel() {
- }
- };
-
try {
- subscriber.onSubscribe(subscription);
- subscribers.put(subscriber, subscription);
+ if (!subscribers.containsKey(subscriber)) {
+ final SseUnboundedSubscription subscription = new SseUnboundedSubscription(subscriber);
+ if (subscribers.putIfAbsent(subscriber, subscription) == null) {
+ subscriber.onSubscribe(subscription);
+ }
+ }
} catch (final Exception ex) {
subscriber.onError(ex);
}
@@ -62,11 +56,11 @@ public class SseBroadcasterImpl implements SseBroadcaster {
@Override
public void broadcast(OutboundSseEvent event) {
- for (final Flow.Subscriber<? super OutboundSseEvent> subscriber: subscribers.keySet()) {
+ for (Map.Entry<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> entry: subscribers.entrySet()) {
try {
- subscriber.onNext(event);
+ entry.getValue().send(event);
} catch (final Exception ex) {
- exceptioners.forEach(exceptioner -> exceptioner.accept(subscriber, ex));
+ exceptioners.forEach(exceptioner -> exceptioner.accept(entry.getKey(), ex));
}
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
new file mode 100644
index 0000000..7c0cb60
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.ws.rs.Flow;
+import javax.ws.rs.Flow.Subscription;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+class SseUnboundedSubscription implements Subscription {
+ // Has subscription been cancelled or not?
+ private boolean cancelled = false;
+ // Current demand: what has been requested but not yet delivered
+ private long demand = 0;
+ private final BlockingQueue<OutboundSseEvent> buffer = new LinkedBlockingQueue<>();
+ private final Flow.Subscriber<? super OutboundSseEvent> subscriber;
+
+ public SseUnboundedSubscription(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public void request(long n) {
+ if (demand + n < 1) {
+ // Effectively unbounded demand
+ demand = Long.MAX_VALUE;
+ send();
+ } else {
+ // Here we record the downstream demand
+ demand += n;
+ send();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
+ public void send(OutboundSseEvent event) throws InterruptedException {
+ if (!cancelled && buffer.offer(event)) {
+ send();
+ }
+ }
+
+ private void send() {
+ while (!cancelled && demand > 0 && !buffer.isEmpty()) {
+ final OutboundSseEvent event = buffer.poll();
+ if (event != null) {
+ subscriber.onNext(event);
+ --demand;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
index c58e58c..268e9d2 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -113,7 +113,7 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
if (!future.isDone()) {
// Let us wait at least 200 milliseconds before returning to ensure
// that SSE had the opportunity to be delivered.
- LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
+ LOG.fine("Waiting 200ms to ensure SSE Atmosphere response is delivered");
future.get(200, TimeUnit.MILLISECONDS);
}
} catch (final ExecutionException | InterruptedException ex) {