You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/03/22 01:04:17 UTC

cxf git commit: SSE implementation updates: added SseUnboundedSubscription, updated SSE/CDI example to use SseBroadcaster

Repository: cxf
Updated Branches:
  refs/heads/master 3833280c6 -> a4b98454d


SSE implementation updates: added SseUnboundedSubscription, updated SSE/CDI example to use SseBroadcaster


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a4b98454
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a4b98454
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a4b98454

Branch: refs/heads/master
Commit: a4b98454d147437ff71b92355d60e8bd48d56448
Parents: 3833280
Author: reta <dr...@gmail.com>
Authored: Tue Mar 21 21:03:50 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Tue Mar 21 21:03:50 2017 -0400

----------------------------------------------------------------------
 .../release/samples/jax_rs/sse_cdi/README.txt   |  6 +-
 .../main/release/samples/jax_rs/sse_cdi/pom.xml |  6 ++
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    | 56 +++++++--------
 .../src/main/resources/web-ui/index.html        |  2 +-
 .../cxf/jaxrs/sse/SseBroadcasterImpl.java       | 26 +++----
 .../cxf/jaxrs/sse/SseUnboundedSubscription.java | 72 ++++++++++++++++++++
 .../atmosphere/SseAtmosphereEventSinkImpl.java  |  2 +-
 7 files changed, 117 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt b/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
index 6917376..7255d9a 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/README.txt
@@ -3,10 +3,8 @@ JAX-RS SSE Demo
 
 This is a SSE version of JAX-RS Basic Demo using CDI.
 
-A SSE endpoint service is provided on URL http://localhost:8686/rest/api/stats/sse/{id}
-where {id} is any integer value, f.e.:
-
-  http://localhost:8686/rest/api/stats/sse/1
+A SSE endpoint service is provided on URL http://localhost:8686/rest/api/stats/sse which
+is broadcasting the SSE events to all subscribers (using RxJava 2 and SseBroadcaster).
 
 This sample includes a simple web UI using Highcharts JavaScript library to show off
 randomly generated statistics about particular server, pushed to the client using

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
index f2d1c8b..aec20ed 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/pom.xml
@@ -94,6 +94,12 @@
             <groupId>org.atmosphere</groupId>
             <artifactId>atmosphere-runtime</artifactId>
         </dependency>
+        
+        <dependency>
+            <groupId>io.reactivex.rxjava2</groupId>
+            <artifactId>rxjava</artifactId>
+            <version>2.0.7</version>
+        </dependency>
     </dependencies>
     
     <profiles>

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 6f48923..0aa943c 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -20,60 +20,54 @@ package demo.jaxrs.sse;
 
 import java.util.Date;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.OutboundSseEvent.Builder;
 import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
 import javax.ws.rs.sse.SseEventSink;
 
+import io.reactivex.Emitter;
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
 @Path("/stats")
 public class StatsRestServiceImpl {
     private static final Random RANDOM = new Random();
-    private Sse sse;
+
+    private SseBroadcaster broadcaster;
+    private Builder builder;
+
 
     @Context 
     public void setSse(Sse sse) {
-        this.sse = sse;
+        this.broadcaster = sse.newBroadcaster();
+        this.builder = sse.newEventBuilder();
+        
+        Flowable
+            .interval(500, TimeUnit.MILLISECONDS)
+            .zipWith(
+                Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
+                (id, bldr) -> createStatsEvent(bldr, id)
+            )
+            .subscribeOn(Schedulers.single())
+            .subscribe(broadcaster::broadcast);
     }
     
     @GET
-    @Path("sse/{id}")
+    @Path("sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
-    public void stats(@Context SseEventSink sink, @PathParam("id") final String id) {
-        new Thread() {
-            public void run() {
-                try {
-                    final Builder builder = sse.newEventBuilder();
-                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
-                    Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
-                    sink.close();
-                } catch (final Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }.start();
+    public void stats(@Context SseEventSink sink) {
+        broadcaster.subscribe(sink);
     }
 
-    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
+    private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {
         return builder
             .id("" + eventId)
             .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
index 2aaed6e..d79f804 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/resources/web-ui/index.html
@@ -31,7 +31,7 @@
 	}); 
 
 	if( !!window.EventSource ) {
-	    var event = new EventSource("http://localhost:8686/rest/api/stats/sse/1");
+	    var event = new EventSource("http://localhost:8686/rest/api/stats/sse");
 	
 	    event.addEventListener('message', function( event ) {	
 	    	var datapoint = jQuery.parseJSON( event.data );

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index ceeaee2..06fb8e3 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -27,12 +27,11 @@ import java.util.function.Consumer;
 
 import javax.ws.rs.Flow;
 import javax.ws.rs.Flow.Subscriber;
-import javax.ws.rs.Flow.Subscription;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
-    private final Map<Flow.Subscriber<? super OutboundSseEvent>, Subscription> subscribers =
+    private final Map<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> subscribers =
             new ConcurrentHashMap<>();
 
     private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers =
@@ -43,18 +42,13 @@ public class SseBroadcasterImpl implements SseBroadcaster {
 
     @Override
     public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
-        final Subscription subscription = new Subscription() {
-            public void request(long n) {
-            }
-
-            @Override
-            public void cancel() {
-            }
-        };
-
         try {
-            subscriber.onSubscribe(subscription);
-            subscribers.put(subscriber, subscription);
+            if (!subscribers.containsKey(subscriber)) {
+                final SseUnboundedSubscription subscription = new SseUnboundedSubscription(subscriber);
+                if (subscribers.putIfAbsent(subscriber, subscription) == null) {
+                    subscriber.onSubscribe(subscription);
+                }
+            }
         } catch (final Exception ex) {
             subscriber.onError(ex);
         }
@@ -62,11 +56,11 @@ public class SseBroadcasterImpl implements SseBroadcaster {
 
     @Override
     public void broadcast(OutboundSseEvent event) {
-        for (final Flow.Subscriber<? super OutboundSseEvent> subscriber: subscribers.keySet()) {
+        for (Map.Entry<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> entry: subscribers.entrySet()) {
             try {
-                subscriber.onNext(event);
+                entry.getValue().send(event);
             } catch (final Exception ex) {
-                exceptioners.forEach(exceptioner -> exceptioner.accept(subscriber, ex));
+                exceptioners.forEach(exceptioner -> exceptioner.accept(entry.getKey(), ex));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
new file mode 100644
index 0000000..7c0cb60
--- /dev/null
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.sse;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.ws.rs.Flow;
+import javax.ws.rs.Flow.Subscription;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+class SseUnboundedSubscription implements Subscription {
+    // Has subscription been cancelled or not?
+    private boolean cancelled = false;
+    // Current demand: what has been requested but not yet delivered
+    private long demand = 0;
+    private final BlockingQueue<OutboundSseEvent> buffer = new LinkedBlockingQueue<>(); 
+    private final Flow.Subscriber<? super OutboundSseEvent> subscriber;
+    
+    public SseUnboundedSubscription(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
+        this.subscriber = subscriber;
+    }
+    
+    public void request(long n) {
+        if (demand + n < 1) {
+            // Effectively unbounded demand 
+            demand = Long.MAX_VALUE;
+            send();
+        } else {
+            // Here we record the downstream demand
+            demand += n;
+            send();
+        }
+    }
+
+    @Override
+    public void cancel() {
+        cancelled = true;
+    }
+    
+    public void send(OutboundSseEvent event) throws InterruptedException {
+        if (!cancelled && buffer.offer(event)) {
+            send();
+        }
+    }
+    
+    private void send() {
+        while (!cancelled && demand > 0 && !buffer.isEmpty()) {
+            final OutboundSseEvent event = buffer.poll();
+            if (event != null) {
+                subscriber.onNext(event);
+                --demand;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/a4b98454/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
index c58e58c..268e9d2 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -113,7 +113,7 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
                     if (!future.isDone()) {
                         // Let us wait at least 200 milliseconds before returning to ensure
                         // that SSE had the opportunity to be delivered.
-                        LOG.info("Waiting 200ms to ensure SSE Atmosphere response is delivered");
+                        LOG.fine("Waiting 200ms to ensure SSE Atmosphere response is delivered");
                         future.get(200, TimeUnit.MILLISECONDS);
                     }
                 } catch (final ExecutionException | InterruptedException ex) {