You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/06/01 12:41:34 UTC
[1/2] aries-jax-rs-whiteboard git commit: Bring SSE changes from CXF
3.2.5-SNAPSHOT
Repository: aries-jax-rs-whiteboard
Updated Branches:
refs/heads/master 1d41c82d6 -> 7e1c84bed
Bring SSE changes from CXF 3.2.5-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/7183bfd9
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/7183bfd9
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/7183bfd9
Branch: refs/heads/master
Commit: 7183bfd91f89ac99c84e711c309218cdda0c5638
Parents: 1d41c82
Author: Carlos Sierra <cs...@apache.org>
Authored: Fri Jun 1 10:16:55 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Fri Jun 1 14:36:39 2018 +0200
----------------------------------------------------------------------
.../cxf/sse/OutboundSseEventBodyWriter.java | 31 +--
.../internal/cxf/sse/OutboundSseEventImpl.java | 31 +--
.../internal/cxf/sse/SseBroadcasterImpl.java | 80 ++++----
.../internal/cxf/sse/SseContextProvider.java | 31 +--
.../cxf/sse/SseEventSinkContextProvider.java | 86 ++------
.../internal/cxf/sse/SseEventSinkImpl.java | 197 ++++++++++---------
.../rs/whiteboard/internal/cxf/sse/SseImpl.java | 31 +--
7 files changed, 227 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
index 954b8c5..32869e6 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import java.io.IOException;
@@ -137,4 +138,4 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse
MediaType mediaType) {
return -1;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
index 2b495e8..85e9e5b 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import java.lang.reflect.Type;
@@ -178,4 +179,4 @@ public final class OutboundSseEventImpl implements OutboundSseEvent {
public Object getData() {
return data;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
index 4ead14d..eee4d70 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import java.io.IOException;
@@ -24,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -36,22 +38,18 @@ import javax.ws.rs.sse.SseEventSink;
public class SseBroadcasterImpl implements SseBroadcaster {
private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
-
- private final Set<Consumer<SseEventSink>> closers =
- new CopyOnWriteArraySet<>();
-
- private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
- new CopyOnWriteArraySet<>();
+ private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+ private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void register(SseEventSink sink) {
- if (closed) throw new IllegalStateException("Already closed");
+ assertNotClosed();
- SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+ final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+ final AsyncContext ctx = sinkImpl.getAsyncContext();
- AsyncContext asyncContext = sinkImpl.getAsyncContext();
-
- asyncContext.addListener(new AsyncListener() {
+ ctx.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
subscribers.remove(sink);
@@ -78,16 +76,14 @@ public class SseBroadcasterImpl implements SseBroadcaster {
@Override
public CompletionStage<?> broadcast(OutboundSseEvent event) {
- if (closed) throw new IllegalStateException("Already closed");
+ assertNotClosed();
final Collection<CompletableFuture<?>> futures = new ArrayList<>();
-
for (SseEventSink sink: subscribers) {
try {
futures.add(sink.send(event).toCompletableFuture());
} catch (final Exception ex) {
- exceptioners.forEach(
- exceptioner -> exceptioner.accept(sink, ex));
+ exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
}
}
@@ -96,27 +92,29 @@ public class SseBroadcasterImpl implements SseBroadcaster {
@Override
public void onClose(Consumer<SseEventSink> subscriber) {
- if (closed) throw new IllegalStateException("Already closed");
-
+ assertNotClosed();
closers.add(subscriber);
}
@Override
public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
- if (closed) throw new IllegalStateException("Already closed");
-
+ assertNotClosed();
exceptioners.add(exceptioner);
}
@Override
public void close() {
- closed = true;
-
- subscribers.forEach(subscriber -> {
- subscriber.close();
- closers.forEach(closer -> closer.accept(subscriber));
- });
+ if (closed.compareAndSet(false, true)) {
+ subscribers.forEach(subscriber -> {
+ subscriber.close();
+ closers.forEach(closer -> closer.accept(subscriber));
+ });
+ }
}
- private volatile boolean closed;
-}
\ No newline at end of file
+ private void assertNotClosed() {
+ if (closed.get()) {
+ throw new IllegalStateException("The SSE broadcaster is already closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
index 054765f..6c4b695 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import javax.ws.rs.sse.Sse;
@@ -27,4 +28,4 @@ public class SseContextProvider implements ContextProvider<Sse> {
public Sse createContext(Message message) {
return new SseImpl();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
index 0e8ad9f..3240fe5 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
@@ -1,41 +1,35 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.Phase;
-import org.apache.cxf.phase.PhaseInterceptor;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
@Override
@@ -48,51 +42,7 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink
final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
ServerProviderFactory.getInstance(message), message.getExchange());
- AsyncContext ctx = request.startAsync();
- ctx.setTimeout(0);
-
- message.getInterceptorChain().add(new SuspendPhaseInterceptor());
-
- return new SseEventSinkImpl(writer, ctx);
- }
-
- private static class SuspendPhaseInterceptor
- implements PhaseInterceptor<Message> {
-
- @Override
- public Set<String> getAfter() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<String> getBefore() {
- return Collections.singleton(
- "org.apache.cxf.interceptor.OutgoingChainInterceptor");
- }
-
- @Override
- public String getId() {
- return "SSE SUSPEND";
- }
-
- @Override
- public String getPhase() {
- return Phase.POST_INVOKE;
- }
-
- @Override
- public Collection<PhaseInterceptor<? extends Message>> getAdditionalInterceptors() {
- return Collections.emptySet();
- }
-
- @Override
- public void handleMessage(Message message) throws Fault {
- message.getInterceptorChain().suspend();
- }
-
- @Override
- public void handleFault(Message message) {
- }
-
+ final AsyncResponseImpl async = new AsyncResponseImpl(message);
+ return new SseEventSinkImpl(writer, async, request.getAsyncContext());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
index b29ad20..cdcacb1 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
@@ -1,31 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import java.lang.annotation.Annotation;
-import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
@@ -33,114 +38,124 @@ import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.common.logging.LogUtils;
public class SseEventSinkImpl implements SseEventSink {
+ private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
-
- public AsyncContext getAsyncContext() {
- return ctx;
- }
+ private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
private final AsyncContext ctx;
-
- private static class QueuedEvent {
- final OutboundSseEvent event;
- final CompletableFuture<?> completion;
-
- public QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
- this.event = event;
- this.completion = completion;
- }
- }
-
private final MessageBodyWriter<OutboundSseEvent> writer;
- private final Queue<QueuedEvent> queuedEvents;
- private boolean dequeueing;
-
- private volatile boolean closed;
+ private final Queue<QueuedEvent> buffer;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean dispatching = new AtomicBoolean(false);
- public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
- final AsyncContext ctx) {
+ public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+ final AsyncResponse async, final AsyncContext ctx) {
+
this.writer = writer;
- this.queuedEvents = new LinkedList<>();
+ this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
this.ctx = ctx;
if (ctx == null) {
throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
- + "Is the Servlet configured properly?");
+ + "Is the Servlet configured properly?");
}
-
+
ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
}
+ public AsyncContext getAsyncContext() {
+ return ctx;
+ }
+
@Override
public void close() {
- if (!closed) {
- closed = true;
-
+ if (closed.compareAndSet(false, true)) {
+ // In case we are still dispatching, give the events the chance to be
+ // sent over to the consumers. The good example would be sent(event) call,
+ // immediately followed by the close() call.
+ if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+ LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+ }
+
try {
ctx.complete();
- } catch (final Exception ex) {
- LOG.warning("Failed to close the AsyncContext cleanly: "
- + ex.getMessage());
+ } catch (final IllegalStateException ex) {
+ LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
}
}
}
+ private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+ final long parkTime = unit.toNanos(timeout) / 20;
+ int attempt = 0;
+
+ while (dispatching.get() && ++attempt < 20) {
+ LockSupport.parkNanos(parkTime);
+ }
+
+ return buffer.isEmpty();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
@Override
public CompletionStage<?> send(OutboundSseEvent event) {
final CompletableFuture<?> future = new CompletableFuture<>();
-
- if (!closed && writer != null) {
-
- boolean startDequeue;
- synchronized (this) {
- queuedEvents.offer(new QueuedEvent(event, future));
- if(dequeueing) {
- startDequeue = false;
- } else {
- startDequeue = true;
- dequeueing = true;
+
+ if (!closed.get() && writer != null) {
+ if (buffer.offer(new QueuedEvent(event, future))) {
+ if (dispatching.compareAndSet(false, true)) {
+ ctx.start(this::dequeue);
}
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The buffer is full (10000), unable to queue SSE event for send"));
}
-
- if(startDequeue) {
- ctx.start(this::dequeue);
- }
- }
- else {
- future.complete(null);
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The sink is already closed, unable to queue SSE event for send"));
}
return future;
}
-
+
private void dequeue() {
-
- for(;;) {
- QueuedEvent qe;
- synchronized (this) {
- qe = queuedEvents.poll();
- if(qe == null) {
- dequeueing = false;
- break;
- }
- }
- OutboundSseEvent event = qe.event;
- CompletableFuture<?> future = qe.completion;
-
- try {
- writer.writeTo(event, event.getClass(), event.getGenericType(), new Annotation [] {}, event.getMediaType(), null, ctx.getResponse().getOutputStream());
- ctx.getResponse().flushBuffer();
- future.complete(null);
-
- } catch (final Exception ex) {
- future.completeExceptionally(ex);
- }
-
- }
+ try {
+ while (true) {
+ final QueuedEvent qeuedEvent = buffer.poll();
+
+ // Nothing queued, release the thread
+ if (qeuedEvent == null) {
+ break;
+ }
+
+ final OutboundSseEvent event = qeuedEvent.event;
+ final CompletableFuture<?> future = qeuedEvent.completion;
+
+ try {
+ writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+ event.getMediaType(), null, ctx.getResponse().getOutputStream());
+ ctx.getResponse().flushBuffer();
+ future.complete(null);
+ } catch (final Exception ex) {
+ future.completeExceptionally(ex);
+ }
+ }
+ } finally {
+ dispatching.set(false);
+ }
}
- @Override
- public boolean isClosed() {
- return closed;
+ private static class QueuedEvent {
+ private final OutboundSseEvent event;
+ private final CompletableFuture<?> completion;
+
+ QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+ this.event = event;
+ this.completion = completion;
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
index b9ce54f..b558120 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
@@ -1,20 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
import javax.ws.rs.sse.OutboundSseEvent.Builder;
@@ -34,4 +35,4 @@ class SseImpl implements Sse {
public SseBroadcaster newBroadcaster() {
return new SseBroadcasterImpl();
}
-}
\ No newline at end of file
+}
[2/2] aries-jax-rs-whiteboard git commit: Bring back personal.bnd
Posted by cs...@apache.org.
Bring back personal.bnd
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/7e1c84be
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/7e1c84be
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/7e1c84be
Branch: refs/heads/master
Commit: 7e1c84bed50dc9667492c597e642ec4c9e709f86
Parents: 7183bfd
Author: Carlos Sierra <cs...@apache.org>
Authored: Fri Jun 1 14:38:52 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Fri Jun 1 14:38:52 2018 +0200
----------------------------------------------------------------------
jax-rs.itests/itest.bndrun | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7e1c84be/jax-rs.itests/itest.bndrun
----------------------------------------------------------------------
diff --git a/jax-rs.itests/itest.bndrun b/jax-rs.itests/itest.bndrun
index 70c2f4f..8839de2 100644
--- a/jax-rs.itests/itest.bndrun
+++ b/jax-rs.itests/itest.bndrun
@@ -41,3 +41,5 @@
osgi.enroute.hamcrest.wrapper;version='[1.3.0,1.3.1)',\
osgi.enroute.junit.wrapper;version='[4.12.0,4.12.1)',\
slf4j.api;version='[1.7.25,1.7.26)'
+
+-include: -personal.bnd
\ No newline at end of file