You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/06/01 12:41:34 UTC

[1/2] aries-jax-rs-whiteboard git commit: Bring SSE changes from CXF 3.2.5-SNAPSHOT

Repository: aries-jax-rs-whiteboard
Updated Branches:
  refs/heads/master 1d41c82d6 -> 7e1c84bed


Bring SSE changes from CXF 3.2.5-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/7183bfd9
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/7183bfd9
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/7183bfd9

Branch: refs/heads/master
Commit: 7183bfd91f89ac99c84e711c309218cdda0c5638
Parents: 1d41c82
Author: Carlos Sierra <cs...@apache.org>
Authored: Fri Jun 1 10:16:55 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Fri Jun 1 14:36:39 2018 +0200

----------------------------------------------------------------------
 .../cxf/sse/OutboundSseEventBodyWriter.java     |  31 +--
 .../internal/cxf/sse/OutboundSseEventImpl.java  |  31 +--
 .../internal/cxf/sse/SseBroadcasterImpl.java    |  80 ++++----
 .../internal/cxf/sse/SseContextProvider.java    |  31 +--
 .../cxf/sse/SseEventSinkContextProvider.java    |  86 ++------
 .../internal/cxf/sse/SseEventSinkImpl.java      | 197 ++++++++++---------
 .../rs/whiteboard/internal/cxf/sse/SseImpl.java |  31 +--
 7 files changed, 227 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
index 954b8c5..32869e6 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import java.io.IOException;
@@ -137,4 +138,4 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse
             MediaType mediaType) {
         return -1;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
index 2b495e8..85e9e5b 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import java.lang.reflect.Type;
@@ -178,4 +179,4 @@ public final class OutboundSseEventImpl implements OutboundSseEvent {
     public Object getData() {
         return data;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
index 4ead14d..eee4d70 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import java.io.IOException;
@@ -24,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -36,22 +38,18 @@ import javax.ws.rs.sse.SseEventSink;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
     private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
-
-    private final Set<Consumer<SseEventSink>> closers =
-            new CopyOnWriteArraySet<>();
-
-    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
-            new CopyOnWriteArraySet<>();
+    private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     @Override
     public void register(SseEventSink sink) {
-        if (closed) throw new IllegalStateException("Already closed");
+        assertNotClosed();
 
-        SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+        final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+        final AsyncContext ctx = sinkImpl.getAsyncContext();
 
-        AsyncContext asyncContext = sinkImpl.getAsyncContext();
-
-        asyncContext.addListener(new AsyncListener() {
+        ctx.addListener(new AsyncListener() {
             @Override
             public void onComplete(AsyncEvent asyncEvent) throws IOException {
                 subscribers.remove(sink);
@@ -78,16 +76,14 @@ public class SseBroadcasterImpl implements SseBroadcaster {
 
     @Override
     public CompletionStage<?> broadcast(OutboundSseEvent event) {
-        if (closed) throw new IllegalStateException("Already closed");
+        assertNotClosed();
 
         final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-        
         for (SseEventSink sink: subscribers) {
             try {
                 futures.add(sink.send(event).toCompletableFuture());
             } catch (final Exception ex) {
-                exceptioners.forEach(
-                    exceptioner -> exceptioner.accept(sink, ex));
+                exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
             }
         }
         
@@ -96,27 +92,29 @@ public class SseBroadcasterImpl implements SseBroadcaster {
 
     @Override
     public void onClose(Consumer<SseEventSink> subscriber) {
-        if (closed) throw new IllegalStateException("Already closed");
-
+        assertNotClosed();
         closers.add(subscriber);
     }
 
     @Override
     public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
-        if (closed) throw new IllegalStateException("Already closed");
-
+        assertNotClosed();
         exceptioners.add(exceptioner);
     }
 
     @Override
     public void close() {
-        closed = true;
-
-        subscribers.forEach(subscriber -> {
-            subscriber.close();
-            closers.forEach(closer -> closer.accept(subscriber));
-        });
+        if (closed.compareAndSet(false, true)) {
+            subscribers.forEach(subscriber -> {
+                subscriber.close();
+                closers.forEach(closer -> closer.accept(subscriber));
+            });
+        }
     }
 
-    private volatile boolean closed;
-}
\ No newline at end of file
+    private void assertNotClosed() {
+        if (closed.get()) {
+            throw new IllegalStateException("The SSE broadcaster is already closed");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
index 054765f..6c4b695 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import javax.ws.rs.sse.Sse;
@@ -27,4 +28,4 @@ public class SseContextProvider implements ContextProvider<Sse> {
     public Sse createContext(Message message) {
         return new SseImpl();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
index 0e8ad9f..3240fe5 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
@@ -1,41 +1,35 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
-import javax.servlet.AsyncContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
 
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.Phase;
-import org.apache.cxf.phase.PhaseInterceptor;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
 public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
 
     @Override
@@ -48,51 +42,7 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink
         final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
             ServerProviderFactory.getInstance(message), message.getExchange());
 
-        AsyncContext ctx = request.startAsync();
-        ctx.setTimeout(0);
-
-        message.getInterceptorChain().add(new SuspendPhaseInterceptor());
-
-        return new SseEventSinkImpl(writer, ctx);
-    }
-
-    private static class SuspendPhaseInterceptor
-        implements PhaseInterceptor<Message> {
-
-        @Override
-        public Set<String> getAfter() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public Set<String> getBefore() {
-            return Collections.singleton(
-                "org.apache.cxf.interceptor.OutgoingChainInterceptor");
-        }
-
-        @Override
-        public String getId() {
-            return "SSE SUSPEND";
-        }
-
-        @Override
-        public String getPhase() {
-            return Phase.POST_INVOKE;
-        }
-
-        @Override
-        public Collection<PhaseInterceptor<? extends Message>> getAdditionalInterceptors() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public void handleMessage(Message message) throws Fault {
-            message.getInterceptorChain().suspend();
-        }
-
-        @Override
-        public void handleFault(Message message) {
-        }
-
+        final AsyncResponseImpl async = new AsyncResponseImpl(message);
+        return new SseEventSinkImpl(writer, async, request.getAsyncContext());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
index b29ad20..cdcacb1 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
@@ -1,31 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import java.lang.annotation.Annotation;
-import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
 import java.util.logging.Logger;
 
 import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
@@ -33,114 +38,124 @@ import javax.ws.rs.sse.SseEventSink;
 import org.apache.cxf.common.logging.LogUtils;
 
 public class SseEventSinkImpl implements SseEventSink {
+    private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
     private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
-
-    public AsyncContext getAsyncContext() {
-        return ctx;
-    }
+    private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
 
     private final AsyncContext ctx;
-
-    private static class QueuedEvent {
-    		final OutboundSseEvent event;
-    		final CompletableFuture<?> completion;
-    		
-		public QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
-			this.event = event;
-			this.completion = completion;
-		}
-    }
-    
     private final MessageBodyWriter<OutboundSseEvent> writer;
-    private final Queue<QueuedEvent> queuedEvents;
-    private boolean dequeueing;
-    
-    private volatile boolean closed;
+    private final Queue<QueuedEvent> buffer;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean dispatching = new AtomicBoolean(false);
 
-    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
-            final AsyncContext ctx) {
+    public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, 
+            final AsyncResponse async, final AsyncContext ctx) {
+        
         this.writer = writer;
-        this.queuedEvents = new LinkedList<>();
+        this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
         this.ctx = ctx;
 
         if (ctx == null) {
             throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
-                    + "Is the Servlet configured properly?");
+                + "Is the Servlet configured properly?");
         }
-        
+
         ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
     }
 
+    public AsyncContext getAsyncContext() {
+        return ctx;
+    }
+
     @Override
     public void close() {
-        if (!closed) {
-            closed = true;
-
+        if (closed.compareAndSet(false, true)) {
+            // In case we are still dispatching, give the events the chance to be
+            // sent over to the consumers. The good example would be sent(event) call,
+            // immediately followed by the close() call.
+            if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+                LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+            }
+            
             try {
                 ctx.complete();
-            } catch (final Exception ex) {
-                LOG.warning("Failed to close the AsyncContext cleanly: "
-                    + ex.getMessage());
+            } catch (final IllegalStateException ex) {
+                LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
             }
         }
     }
 
+    private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+        final long parkTime = unit.toNanos(timeout) / 20;
+        int attempt = 0;
+        
+        while (dispatching.get() && ++attempt < 20) {
+            LockSupport.parkNanos(parkTime);
+        }
+        
+        return buffer.isEmpty();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed.get();
+    }
+
     @Override
     public CompletionStage<?> send(OutboundSseEvent event) {
         final CompletableFuture<?> future = new CompletableFuture<>();
-        
-        if (!closed && writer != null) {
-        	    
-            boolean startDequeue;
-            synchronized (this) {
-                queuedEvents.offer(new QueuedEvent(event, future));
-                if(dequeueing) {
-                    startDequeue = false;
-                } else {
-                    startDequeue = true;
-                    dequeueing = true;
+
+        if (!closed.get() && writer != null) {
+            if (buffer.offer(new QueuedEvent(event, future))) {
+                if (dispatching.compareAndSet(false, true)) {
+                    ctx.start(this::dequeue);
                 }
+            } else {
+                future.completeExceptionally(new IllegalStateException(
+                    "The buffer is full (10000), unable to queue SSE event for send"));
             }
-        		
-            if(startDequeue) {
-                ctx.start(this::dequeue);
-            }
-        }
-        else {
-            future.complete(null);
+        } else {
+            future.completeExceptionally(new IllegalStateException(
+                "The sink is already closed, unable to queue SSE event for send"));
         }
 
         return future;
     }
-    
+
     private void dequeue() {
-    	
-    		for(;;) {
-    			QueuedEvent qe;
-    			synchronized (this) {
-	    			qe = queuedEvents.poll();
-    				if(qe == null) {
-    					dequeueing = false;
-    					break;
-    				}
-    			}
-			OutboundSseEvent event = qe.event;
-			CompletableFuture<?> future = qe.completion;
-			
-			try {
-                writer.writeTo(event, event.getClass(), event.getGenericType(), new Annotation [] {}, event.getMediaType(), null, ctx.getResponse().getOutputStream());
-                ctx.getResponse().flushBuffer();
-                future.complete(null);
-
-			} catch (final Exception ex) {
-				future.completeExceptionally(ex);
-			}
-    			
-    		}
+        try {
+            while (true) {
+                final QueuedEvent qeuedEvent = buffer.poll();
+                
+                // Nothing queued, release the thread
+                if (qeuedEvent == null) {
+                    break;
+                }
+                
+                final OutboundSseEvent event = qeuedEvent.event;
+                final CompletableFuture<?> future = qeuedEvent.completion;
+    
+                try {
+                    writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+                        event.getMediaType(), null, ctx.getResponse().getOutputStream());
+                    ctx.getResponse().flushBuffer();
+                    future.complete(null);
+                } catch (final Exception ex) {
+                    future.completeExceptionally(ex);
+                }
+            }
+        } finally {
+            dispatching.set(false);
+        }
     }
 
-    @Override
-    public boolean isClosed() {
-        return closed;
+    private static class QueuedEvent {
+        private final OutboundSseEvent event;
+        private final CompletableFuture<?> completion;
+
+        QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+            this.event = event;
+            this.completion = completion;
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
index b9ce54f..b558120 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
 
 import javax.ws.rs.sse.OutboundSseEvent.Builder;
@@ -34,4 +35,4 @@ class SseImpl implements Sse {
     public SseBroadcaster newBroadcaster() {
         return new SseBroadcasterImpl();
     }
-}
\ No newline at end of file
+}


[2/2] aries-jax-rs-whiteboard git commit: Bring back personal.bnd

Posted by cs...@apache.org.
Bring back personal.bnd


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/7e1c84be
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/7e1c84be
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/7e1c84be

Branch: refs/heads/master
Commit: 7e1c84bed50dc9667492c597e642ec4c9e709f86
Parents: 7183bfd
Author: Carlos Sierra <cs...@apache.org>
Authored: Fri Jun 1 14:38:52 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Fri Jun 1 14:38:52 2018 +0200

----------------------------------------------------------------------
 jax-rs.itests/itest.bndrun | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7e1c84be/jax-rs.itests/itest.bndrun
----------------------------------------------------------------------
diff --git a/jax-rs.itests/itest.bndrun b/jax-rs.itests/itest.bndrun
index 70c2f4f..8839de2 100644
--- a/jax-rs.itests/itest.bndrun
+++ b/jax-rs.itests/itest.bndrun
@@ -41,3 +41,5 @@
 	osgi.enroute.hamcrest.wrapper;version='[1.3.0,1.3.1)',\
 	osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)',\
 	slf4j.api;version='[1.7.25,1.7.26)'
+
+-include: -personal.bnd
\ No newline at end of file