You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/06/26 19:18:47 UTC
[08/10] aries-jax-rs-whiteboard git commit: Revert "Use latest CXF
version (3.2.5)"
Revert "Use latest CXF version (3.2.5)"
This reverts commit 7e16259c6ee784abff6d133711eaef735a76e10b.
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/bcc579fd
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/bcc579fd
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/bcc579fd
Branch: refs/heads/master
Commit: bcc579fd9bd2156dbe9d645a74e76976f23cbda4
Parents: ab0a825
Author: Carlos Sierra <cs...@apache.org>
Authored: Tue Jun 26 20:35:11 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Tue Jun 26 21:11:18 2018 +0200
----------------------------------------------------------------------
.../cxf/CxfJaxrsServiceRegistrator.java | 4 +-
.../cxf/sse/OutboundSseEventBodyWriter.java | 141 ++++++++
.../internal/cxf/sse/OutboundSseEventImpl.java | 182 +++++++++++
.../internal/cxf/sse/SseBroadcasterImpl.java | 120 +++++++
.../internal/cxf/sse/SseContextProvider.java | 31 ++
.../cxf/sse/SseEventSinkContextProvider.java | 48 +++
.../internal/cxf/sse/SseEventSinkImpl.java | 161 ++++++++++
.../rs/whiteboard/internal/cxf/sse/SseImpl.java | 38 +++
.../cxf/jaxrs/impl/AsyncResponseImpl.java | 321 +++++++++++++++++++
pom.xml | 2 +-
10 files changed, 1045 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
index 54cb945..dc82163 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
@@ -40,6 +40,8 @@ import javax.ws.rs.core.Feature;
import javax.ws.rs.core.FeatureContext;
import javax.ws.rs.ext.RuntimeDelegate;
+import org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseContextProvider;
+import org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseEventSinkContextProvider;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceReferenceResourceProvider;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
import org.apache.aries.component.dsl.CachingServiceReference;
@@ -54,8 +56,6 @@ import org.apache.cxf.jaxrs.model.ApplicationInfo;
import org.apache.cxf.jaxrs.model.ClassResourceInfo;
import org.apache.cxf.jaxrs.provider.ProviderFactory.ProviderInfoClassComparator;
import org.apache.cxf.jaxrs.provider.ServerConfigurableFactory;
-import org.apache.cxf.jaxrs.sse.SseContextProvider;
-import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
import org.apache.cxf.jaxrs.utils.AnnotationUtils;
import org.osgi.framework.ServiceObjects;
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
new file mode 100644
index 0000000..32869e6
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+@Provider
+public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> {
+ public static final String SERVER_SENT_EVENTS = "text/event-stream";
+ public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
+
+ private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] EVENT = "event: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8);
+
+ private ServerProviderFactory factory;
+ private Message message;
+
+ protected OutboundSseEventBodyWriter() {
+ }
+
+ public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) {
+ this.factory = factory;
+ this.message = new MessageImpl();
+ this.message.setExchange(exchange);
+ }
+
+
+ @Override
+ public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
+ return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt);
+ }
+
+ @Override
+ public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
+ MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os)
+ throws IOException, WebApplicationException {
+
+ if (p.getName() != null) {
+ os.write(EVENT);
+ os.write(p.getName().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getId() != null) {
+ os.write(ID);
+ os.write(p.getId().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getComment() != null) {
+ os.write(COMMENT);
+ os.write(p.getComment().getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getReconnectDelay() > 0) {
+ os.write(RETRY);
+ os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8));
+ os.write(NEW_LINE);
+ }
+
+ if (p.getData() != null) {
+ Class<?> payloadClass = p.getType();
+ Type payloadType = p.getGenericType();
+ if (payloadType == null) {
+ payloadType = payloadClass;
+ }
+
+ if (payloadType == null && payloadClass == null) {
+ payloadType = Object.class;
+ payloadClass = Object.class;
+ }
+
+ os.write(DATA);
+ writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
+ os.write(NEW_LINE);
+ }
+
+ os.write(NEW_LINE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt,
+ MultivaluedMap<String, Object> headers, Object data, OutputStream os)
+ throws IOException, WebApplicationException {
+
+ MessageBodyWriter<T> writer = null;
+ if (message != null && factory != null) {
+ writer = factory.createMessageBodyWriter(cls, type, anns, mt, message);
+ }
+
+ if (writer == null) {
+ throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName());
+ }
+
+ writer.writeTo((T)data, cls, type, anns, mt, headers, os);
+ }
+
+ @Override
+ public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations,
+ MediaType mediaType) {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
new file mode 100644
index 0000000..85e9e5b
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.sse.OutboundSseEvent;
+
+public final class OutboundSseEventImpl implements OutboundSseEvent {
+ private final String id;
+ private final String name;
+ private final String comment;
+ private final long reconnectDelay;
+ private final Class<?> type;
+ private final Type genericType;
+ private final MediaType mediaType;
+ private final Object data;
+
+ public static class BuilderImpl implements Builder {
+ private String id;
+ private String name;
+ private String comment;
+ private long reconnectDelay = -1;
+ private Class<?> type;
+ private Type genericType;
+ private MediaType mediaType = MediaType.TEXT_PLAIN_TYPE;
+ private Object data;
+
+ @Override
+ public Builder id(String newId) {
+ this.id = newId;
+ return this;
+ }
+
+ @Override
+ public Builder name(String newName) {
+ this.name = newName;
+ return this;
+ }
+
+ @Override
+ public Builder reconnectDelay(long milliseconds) {
+ this.reconnectDelay = milliseconds;
+ return this;
+ }
+
+ @Override
+ public Builder mediaType(MediaType newMediaType) {
+ this.mediaType = newMediaType;
+ return this;
+ }
+
+ @Override
+ public Builder comment(String newComment) {
+ this.comment = newComment;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Builder data(Class newType, Object newData) {
+ if (newType == null || newData == null) {
+ throw new IllegalArgumentException("Parameters 'type' and 'data' must not be null.");
+ }
+ this.type = newType;
+ this.data = newData;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Builder data(GenericType newType, Object newData) {
+ if (newType == null || newData == null) {
+ throw new IllegalArgumentException("Parameters 'type' and 'data' must not be null.");
+ }
+ this.genericType = newType.getType();
+ this.data = newData;
+ return this;
+ }
+
+ @Override
+ public Builder data(Object newData) {
+ if (newData == null) {
+ throw new IllegalArgumentException("Parameter 'data' must not be null.");
+ }
+ this.type = newData.getClass();
+ this.data = newData;
+ return this;
+ }
+
+ @Override
+ public OutboundSseEvent build() {
+ return new OutboundSseEventImpl(
+ id,
+ name,
+ comment,
+ reconnectDelay,
+ type,
+ genericType,
+ mediaType,
+ data
+ );
+ }
+
+ }
+ //CHECKSTYLE:OFF
+ private OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay,
+ Class<?> type, Type genericType, MediaType mediaType, Object data) {
+ this.id = id;
+ this.name = name;
+ this.comment = comment;
+ this.reconnectDelay = reconnectDelay;
+ this.type = type;
+ this.genericType = genericType;
+ this.mediaType = mediaType;
+ this.data = data;
+ }
+ //CHECKSTYLE:ON
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public long getReconnectDelay() {
+ return reconnectDelay;
+ }
+
+ @Override
+ public boolean isReconnectDelaySet() {
+ return reconnectDelay != -1;
+ }
+
+ @Override
+ public Class<?> getType() {
+ return type;
+ }
+
+ @Override
+ public Type getGenericType() {
+ return genericType;
+ }
+
+ @Override
+ public MediaType getMediaType() {
+ return mediaType;
+ }
+
+ @Override
+ public Object getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
new file mode 100644
index 0000000..eee4d70
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
+
+public class SseBroadcasterImpl implements SseBroadcaster {
+ private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
+ private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
+ private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Override
+ public void register(SseEventSink sink) {
+ assertNotClosed();
+
+ final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
+ final AsyncContext ctx = sinkImpl.getAsyncContext();
+
+ ctx.addListener(new AsyncListener() {
+ @Override
+ public void onComplete(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onError(AsyncEvent asyncEvent) throws IOException {
+ subscribers.remove(sink);
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
+
+ }
+ });
+
+ subscribers.add(sink);
+ }
+
+ @Override
+ public CompletionStage<?> broadcast(OutboundSseEvent event) {
+ assertNotClosed();
+
+ final Collection<CompletableFuture<?>> futures = new ArrayList<>();
+ for (SseEventSink sink: subscribers) {
+ try {
+ futures.add(sink.send(event).toCompletableFuture());
+ } catch (final Exception ex) {
+ exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
+ }
+ }
+
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+ }
+
+ @Override
+ public void onClose(Consumer<SseEventSink> subscriber) {
+ assertNotClosed();
+ closers.add(subscriber);
+ }
+
+ @Override
+ public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
+ assertNotClosed();
+ exceptioners.add(exceptioner);
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ subscribers.forEach(subscriber -> {
+ subscriber.close();
+ closers.forEach(closer -> closer.accept(subscriber));
+ });
+ }
+ }
+
+ private void assertNotClosed() {
+ if (closed.get()) {
+ throw new IllegalStateException("The SSE broadcaster is already closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
new file mode 100644
index 0000000..6c4b695
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.ws.rs.sse.Sse;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.message.Message;
+
+public class SseContextProvider implements ContextProvider<Sse> {
+ @Override
+ public Sse createContext(Message message) {
+ return new SseImpl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
new file mode 100644
index 0000000..3240fe5
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.jaxrs.ext.ContextProvider;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
+
+public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
+
+ @Override
+ public SseEventSink createContext(Message message) {
+ final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
+ if (request == null) {
+ throw new IllegalStateException("Unable to retrieve HTTP request from the context");
+ }
+
+ final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
+ ServerProviderFactory.getInstance(message), message.getExchange());
+
+ final AsyncResponseImpl async = new AsyncResponseImpl(message);
+ return new SseEventSinkImpl(writer, async, request.getAsyncContext());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
new file mode 100644
index 0000000..cdcacb1
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import java.lang.annotation.Annotation;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.sse.OutboundSseEvent;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class SseEventSinkImpl implements SseEventSink {
+ private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
+ private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
+ private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
+
+ private final AsyncContext ctx;
+ private final MessageBodyWriter<OutboundSseEvent> writer;
+ private final Queue<QueuedEvent> buffer;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean dispatching = new AtomicBoolean(false);
+
+ public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
+ final AsyncResponse async, final AsyncContext ctx) {
+
+ this.writer = writer;
+ this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
+ this.ctx = ctx;
+
+ if (ctx == null) {
+ throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
+ + "Is the Servlet configured properly?");
+ }
+
+ ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
+ }
+
+ public AsyncContext getAsyncContext() {
+ return ctx;
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ // In case we are still dispatching, give the events the chance to be
+ // sent over to the consumers. The good example would be sent(event) call,
+ // immediately followed by the close() call.
+ if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
+ LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
+ }
+
+ try {
+ ctx.complete();
+ } catch (final IllegalStateException ex) {
+ LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
+ }
+ }
+ }
+
+ private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
+ final long parkTime = unit.toNanos(timeout) / 20;
+ int attempt = 0;
+
+ while (dispatching.get() && ++attempt < 20) {
+ LockSupport.parkNanos(parkTime);
+ }
+
+ return buffer.isEmpty();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public CompletionStage<?> send(OutboundSseEvent event) {
+ final CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (!closed.get() && writer != null) {
+ if (buffer.offer(new QueuedEvent(event, future))) {
+ if (dispatching.compareAndSet(false, true)) {
+ ctx.start(this::dequeue);
+ }
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The buffer is full (10000), unable to queue SSE event for send"));
+ }
+ } else {
+ future.completeExceptionally(new IllegalStateException(
+ "The sink is already closed, unable to queue SSE event for send"));
+ }
+
+ return future;
+ }
+
+ private void dequeue() {
+ try {
+ while (true) {
+ final QueuedEvent qeuedEvent = buffer.poll();
+
+ // Nothing queued, release the thread
+ if (qeuedEvent == null) {
+ break;
+ }
+
+ final OutboundSseEvent event = qeuedEvent.event;
+ final CompletableFuture<?> future = qeuedEvent.completion;
+
+ try {
+ writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
+ event.getMediaType(), null, ctx.getResponse().getOutputStream());
+ ctx.getResponse().flushBuffer();
+ future.complete(null);
+ } catch (final Exception ex) {
+ future.completeExceptionally(ex);
+ }
+ }
+ } finally {
+ dispatching.set(false);
+ }
+ }
+
+ private static class QueuedEvent {
+ private final OutboundSseEvent event;
+ private final CompletableFuture<?> completion;
+
+ QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
+ this.event = event;
+ this.completion = completion;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
new file mode 100644
index 0000000..b558120
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
+
+import javax.ws.rs.sse.OutboundSseEvent.Builder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseBroadcaster;
+
+class SseImpl implements Sse {
+ SseImpl() {
+ }
+
+ @Override
+ public Builder newEventBuilder() {
+ return new OutboundSseEventImpl.BuilderImpl();
+ }
+
+ @Override
+ public SseBroadcaster newBroadcaster() {
+ return new SseBroadcasterImpl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
new file mode 100644
index 0000000..eb3a8d2
--- /dev/null
+++ b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.CompletionCallback;
+import javax.ws.rs.container.ConnectionCallback;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationCallback;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.jaxrs.utils.HttpUtils;
+import org.apache.cxf.message.Message;
+
+
+public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
+
+ private Continuation cont;
+ private Message inMessage;
+ private TimeoutHandler timeoutHandler;
+ private volatile boolean initialSuspend;
+ private volatile boolean cancelled;
+ private volatile boolean done;
+ private volatile boolean resumedByApplication;
+ private volatile Long pendingTimeout;
+
+ private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>();
+ private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>();
+ private Throwable unmappedThrowable;
+
+ public AsyncResponseImpl(Message inMessage) {
+ inMessage.put(AsyncResponse.class, this);
+ inMessage.getExchange().put(ContinuationCallback.class, this);
+ this.inMessage = inMessage;
+
+ initContinuation();
+ }
+
+ @Override
+ public boolean resume(Object response) {
+ return doResume(response);
+ }
+
+ @Override
+ public boolean resume(Throwable response) {
+ return doResume(response);
+ }
+
+ private boolean isCancelledOrNotSuspended() {
+ return isCancelled() || !isSuspended();
+ }
+
+ private boolean doResume(Object response) {
+ if (isCancelledOrNotSuspended()) {
+ return false;
+ }
+ return doResumeFinal(response);
+ }
+ private synchronized boolean doResumeFinal(Object response) {
+ inMessage.getExchange().put(AsyncResponse.class, this);
+ cont.setObject(response);
+ resumedByApplication = true;
+ if (!initialSuspend) {
+ cont.resume();
+ } else {
+ initialSuspend = false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean cancel() {
+ return doCancel(null);
+ }
+
+ @Override
+ public boolean cancel(int retryAfter) {
+ return doCancel(Integer.toString(retryAfter));
+ }
+
+ @Override
+ public boolean cancel(Date retryAfter) {
+ return doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
+ }
+
+ private boolean doCancel(String retryAfterHeader) {
+ if (cancelled) {
+ return true;
+ }
+ if (!isSuspended()) {
+ return false;
+ }
+
+ cancelled = true;
+ ResponseBuilder rb = Response.status(503);
+ if (retryAfterHeader != null) {
+ rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
+ }
+ doResumeFinal(rb.build());
+ return cancelled;
+ }
+
+ @Override
+ public boolean isSuspended() {
+ if (cancelled || resumedByApplication) {
+ return false;
+ }
+ return initialSuspend || cont.isPending();
+ }
+
+ @Override
+ public synchronized boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public synchronized boolean setTimeout(long time, TimeUnit unit) throws IllegalStateException {
+ if (isCancelledOrNotSuspended()) {
+ return false;
+ }
+ setAsyncResponseOnExchange();
+ long timeout = TimeUnit.MILLISECONDS.convert(time, unit);
+ initialSuspend = false;
+ if (!cont.isPending()) {
+ cont.suspend(timeout);
+ } else {
+ pendingTimeout = timeout;
+ cont.resume();
+ }
+ return true;
+ }
+
+ private void setAsyncResponseOnExchange() {
+ inMessage.getExchange().put(AsyncResponse.class, this);
+ }
+
+ @Override
+ public void setTimeoutHandler(TimeoutHandler handler) {
+ timeoutHandler = handler;
+ }
+
+ @Override
+ public Collection<Class<?>> register(Class<?> callback) throws NullPointerException {
+ return register(callback, new Class<?>[]{}).get(callback);
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks)
+ throws NullPointerException {
+ try {
+ Object[] extraCallbacks = new Object[callbacks.length];
+ for (int i = 0; i < callbacks.length; i++) {
+ extraCallbacks[i] = callbacks[i].newInstance();
+ }
+ return register(callback.newInstance(), extraCallbacks);
+ } catch (NullPointerException e) {
+ throw e;
+ } catch (Throwable t) {
+ return Collections.emptyMap();
+ }
+
+ }
+
+ @Override
+ public Collection<Class<?>> register(Object callback) throws NullPointerException {
+ return register(callback, new Object[]{}).get(callback.getClass());
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks)
+ throws NullPointerException {
+ Map<Class<?>, Collection<Class<?>>> map =
+ new HashMap<Class<?>, Collection<Class<?>>>();
+
+ Object[] allCallbacks = new Object[1 + callbacks.length];
+ allCallbacks[0] = callback;
+ System.arraycopy(callbacks, 0, allCallbacks, 1, callbacks.length);
+
+ for (int i = 0; i < allCallbacks.length; i++) {
+ if (allCallbacks[i] == null) {
+ throw new NullPointerException();
+ }
+ Class<?> callbackCls = allCallbacks[i].getClass();
+ Collection<Class<?>> knownCallbacks = map.get(callbackCls);
+ if (knownCallbacks == null) {
+ knownCallbacks = new HashSet<Class<?>>();
+ map.put(callbackCls, knownCallbacks);
+ }
+
+ if (allCallbacks[i] instanceof CompletionCallback) {
+ knownCallbacks.add(CompletionCallback.class);
+ completionCallbacks.add((CompletionCallback)allCallbacks[i]);
+ } else if (allCallbacks[i] instanceof ConnectionCallback) {
+ knownCallbacks.add(ConnectionCallback.class);
+ connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);
+ }
+ }
+ return map;
+ }
+
+ @Override
+ public void onComplete() {
+ done = true;
+ updateCompletionCallbacks(unmappedThrowable);
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ updateCompletionCallbacks(error);
+ }
+
+ private void updateCompletionCallbacks(Throwable error) {
+ Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
+ for (CompletionCallback completionCallback : completionCallbacks) {
+ completionCallback.onComplete(actualError);
+ }
+ }
+
+ @Override
+ public void onDisconnect() {
+ for (ConnectionCallback connectionCallback : connectionCallbacks) {
+ connectionCallback.onDisconnect(this);
+ }
+ }
+
+ public synchronized boolean suspendContinuationIfNeeded() {
+ if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) {
+ cont.suspend(AsyncResponse.NO_TIMEOUT);
+ initialSuspend = false;
+ return true;
+ }
+ return false;
+ }
+
+ @SuppressWarnings("resource") // Response that is built here shouldn't be closed here
+ public Object getResponseObject() {
+ Object obj = cont.getObject();
+ if (!(obj instanceof Response) && !(obj instanceof Throwable)) {
+ if (obj == null) {
+ obj = Response.noContent().build();
+ } else {
+ obj = Response.ok().entity(obj).build();
+ }
+ }
+ return obj;
+ }
+
+ public boolean isResumedByApplication() {
+ return resumedByApplication;
+ }
+
+ public synchronized void handleTimeout() {
+ if (!resumedByApplication) {
+ if (pendingTimeout != null) {
+ setAsyncResponseOnExchange();
+ cont.suspend(pendingTimeout);
+ pendingTimeout = null;
+ } else if (timeoutHandler != null) {
+ timeoutHandler.handleTimeout(this);
+ } else {
+ cont.setObject(new ServiceUnavailableException());
+ }
+ }
+ }
+
+ private void initContinuation() {
+ ContinuationProvider provider =
+ (ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
+ cont = provider.getContinuation();
+ initialSuspend = true;
+ }
+
+ public void prepareContinuation() {
+ initContinuation();
+ }
+
+ public void setUnmappedThrowable(Throwable t) {
+ unmappedThrowable = t;
+ }
+ public void reset() {
+ cont.reset();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/bcc579fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51b77bd..bbb9537 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <cxf.version>3.2.5</cxf.version>
+ <cxf.version>3.2.4</cxf.version>
<bnd.version>4.0.0</bnd.version>
</properties>