You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/06/26 19:50:02 UTC
[2/4] aries-jax-rs-whiteboard git commit: Use latest CXF version
(3.2.5)
Use latest CXF version (3.2.5)
Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/4c13daf7
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/4c13daf7
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/4c13daf7
Branch: refs/heads/master
Commit: 4c13daf76e1fe9637d30bd6ef0750a59d894524e
Parents: a6ba7b9
Author: Carlos Sierra <cs...@apache.org>
Authored: Tue Jun 26 21:46:09 2018 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Tue Jun 26 21:47:45 2018 +0200
----------------------------------------------------------------------
.../cxf/CxfJaxrsServiceRegistrator.java | 4 +-
.../cxf/sse/OutboundSseEventBodyWriter.java | 141 --------
.../internal/cxf/sse/OutboundSseEventImpl.java | 182 -----------
.../internal/cxf/sse/SseBroadcasterImpl.java | 120 -------
.../internal/cxf/sse/SseContextProvider.java | 31 --
.../cxf/sse/SseEventSinkContextProvider.java | 48 ---
.../internal/cxf/sse/SseEventSinkImpl.java | 161 ----------
.../rs/whiteboard/internal/cxf/sse/SseImpl.java | 38 ---
.../cxf/jaxrs/impl/AsyncResponseImpl.java | 321 -------------------
pom.xml | 2 +-
10 files changed, 3 insertions(+), 1045 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
index dc82163..54cb945 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/CxfJaxrsServiceRegistrator.java
@@ -40,8 +40,6 @@ import javax.ws.rs.core.Feature;
import javax.ws.rs.core.FeatureContext;
import javax.ws.rs.ext.RuntimeDelegate;
-import org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseContextProvider;
-import org.apache.aries.jax.rs.whiteboard.internal.cxf.sse.SseEventSinkContextProvider;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceReferenceResourceProvider;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
import org.apache.aries.component.dsl.CachingServiceReference;
@@ -56,6 +54,8 @@ import org.apache.cxf.jaxrs.model.ApplicationInfo;
import org.apache.cxf.jaxrs.model.ClassResourceInfo;
import org.apache.cxf.jaxrs.provider.ProviderFactory.ProviderInfoClassComparator;
import org.apache.cxf.jaxrs.provider.ServerConfigurableFactory;
+import org.apache.cxf.jaxrs.sse.SseContextProvider;
+import org.apache.cxf.jaxrs.sse.SseEventSinkContextProvider;
import org.apache.cxf.jaxrs.utils.AnnotationUtils;
import org.osgi.framework.ServiceObjects;
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
deleted file mode 100644
index 32869e6..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.nio.charset.StandardCharsets;
-
-import javax.ws.rs.InternalServerErrorException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.sse.OutboundSseEvent;
-
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageImpl;
-
-@Provider
-public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSseEvent> {
- public static final String SERVER_SENT_EVENTS = "text/event-stream";
- public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS);
-
- private static final byte[] COMMENT = ": ".getBytes(StandardCharsets.UTF_8);
- private static final byte[] EVENT = "event: ".getBytes(StandardCharsets.UTF_8);
- private static final byte[] ID = "id: ".getBytes(StandardCharsets.UTF_8);
- private static final byte[] RETRY = "retry: ".getBytes(StandardCharsets.UTF_8);
- private static final byte[] DATA = "data: ".getBytes(StandardCharsets.UTF_8);
- private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8);
-
- private ServerProviderFactory factory;
- private Message message;
-
- protected OutboundSseEventBodyWriter() {
- }
-
- public OutboundSseEventBodyWriter(final ServerProviderFactory factory, final Exchange exchange) {
- this.factory = factory;
- this.message = new MessageImpl();
- this.message.setExchange(exchange);
- }
-
-
- @Override
- public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) {
- return OutboundSseEvent.class.isAssignableFrom(cls) || SERVER_SENT_EVENTS_TYPE.isCompatible(mt);
- }
-
- @Override
- public void writeTo(OutboundSseEvent p, Class<?> cls, Type t, Annotation[] anns,
- MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os)
- throws IOException, WebApplicationException {
-
- if (p.getName() != null) {
- os.write(EVENT);
- os.write(p.getName().getBytes(StandardCharsets.UTF_8));
- os.write(NEW_LINE);
- }
-
- if (p.getId() != null) {
- os.write(ID);
- os.write(p.getId().getBytes(StandardCharsets.UTF_8));
- os.write(NEW_LINE);
- }
-
- if (p.getComment() != null) {
- os.write(COMMENT);
- os.write(p.getComment().getBytes(StandardCharsets.UTF_8));
- os.write(NEW_LINE);
- }
-
- if (p.getReconnectDelay() > 0) {
- os.write(RETRY);
- os.write(Long.toString(p.getReconnectDelay()).getBytes(StandardCharsets.UTF_8));
- os.write(NEW_LINE);
- }
-
- if (p.getData() != null) {
- Class<?> payloadClass = p.getType();
- Type payloadType = p.getGenericType();
- if (payloadType == null) {
- payloadType = payloadClass;
- }
-
- if (payloadType == null && payloadClass == null) {
- payloadType = Object.class;
- payloadClass = Object.class;
- }
-
- os.write(DATA);
- writePayloadTo(payloadClass, payloadType, anns, p.getMediaType(), headers, p.getData(), os);
- os.write(NEW_LINE);
- }
-
- os.write(NEW_LINE);
- }
-
- @SuppressWarnings("unchecked")
- private<T> void writePayloadTo(Class<T> cls, Type type, Annotation[] anns, MediaType mt,
- MultivaluedMap<String, Object> headers, Object data, OutputStream os)
- throws IOException, WebApplicationException {
-
- MessageBodyWriter<T> writer = null;
- if (message != null && factory != null) {
- writer = factory.createMessageBodyWriter(cls, type, anns, mt, message);
- }
-
- if (writer == null) {
- throw new InternalServerErrorException("No suitable message body writer for class: " + cls.getName());
- }
-
- writer.writeTo((T)data, cls, type, anns, mt, headers, os);
- }
-
- @Override
- public long getSize(OutboundSseEvent t, Class<?> type, Type genericType, Annotation[] annotations,
- MediaType mediaType) {
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
deleted file mode 100644
index 85e9e5b..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import java.lang.reflect.Type;
-
-import javax.ws.rs.core.GenericType;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.sse.OutboundSseEvent;
-
-public final class OutboundSseEventImpl implements OutboundSseEvent {
- private final String id;
- private final String name;
- private final String comment;
- private final long reconnectDelay;
- private final Class<?> type;
- private final Type genericType;
- private final MediaType mediaType;
- private final Object data;
-
- public static class BuilderImpl implements Builder {
- private String id;
- private String name;
- private String comment;
- private long reconnectDelay = -1;
- private Class<?> type;
- private Type genericType;
- private MediaType mediaType = MediaType.TEXT_PLAIN_TYPE;
- private Object data;
-
- @Override
- public Builder id(String newId) {
- this.id = newId;
- return this;
- }
-
- @Override
- public Builder name(String newName) {
- this.name = newName;
- return this;
- }
-
- @Override
- public Builder reconnectDelay(long milliseconds) {
- this.reconnectDelay = milliseconds;
- return this;
- }
-
- @Override
- public Builder mediaType(MediaType newMediaType) {
- this.mediaType = newMediaType;
- return this;
- }
-
- @Override
- public Builder comment(String newComment) {
- this.comment = newComment;
- return this;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public Builder data(Class newType, Object newData) {
- if (newType == null || newData == null) {
- throw new IllegalArgumentException("Parameters 'type' and 'data' must not be null.");
- }
- this.type = newType;
- this.data = newData;
- return this;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public Builder data(GenericType newType, Object newData) {
- if (newType == null || newData == null) {
- throw new IllegalArgumentException("Parameters 'type' and 'data' must not be null.");
- }
- this.genericType = newType.getType();
- this.data = newData;
- return this;
- }
-
- @Override
- public Builder data(Object newData) {
- if (newData == null) {
- throw new IllegalArgumentException("Parameter 'data' must not be null.");
- }
- this.type = newData.getClass();
- this.data = newData;
- return this;
- }
-
- @Override
- public OutboundSseEvent build() {
- return new OutboundSseEventImpl(
- id,
- name,
- comment,
- reconnectDelay,
- type,
- genericType,
- mediaType,
- data
- );
- }
-
- }
- //CHECKSTYLE:OFF
- private OutboundSseEventImpl(String id, String name, String comment, long reconnectDelay,
- Class<?> type, Type genericType, MediaType mediaType, Object data) {
- this.id = id;
- this.name = name;
- this.comment = comment;
- this.reconnectDelay = reconnectDelay;
- this.type = type;
- this.genericType = genericType;
- this.mediaType = mediaType;
- this.data = data;
- }
- //CHECKSTYLE:ON
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getComment() {
- return comment;
- }
-
- @Override
- public long getReconnectDelay() {
- return reconnectDelay;
- }
-
- @Override
- public boolean isReconnectDelaySet() {
- return reconnectDelay != -1;
- }
-
- @Override
- public Class<?> getType() {
- return type;
- }
-
- @Override
- public Type getGenericType() {
- return genericType;
- }
-
- @Override
- public MediaType getMediaType() {
- return mediaType;
- }
-
- @Override
- public Object getData() {
- return data;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
deleted file mode 100644
index eee4d70..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseBroadcaster;
-import javax.ws.rs.sse.SseEventSink;
-
-public class SseBroadcasterImpl implements SseBroadcaster {
- private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
- private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>();
- private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>();
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- @Override
- public void register(SseEventSink sink) {
- assertNotClosed();
-
- final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink;
- final AsyncContext ctx = sinkImpl.getAsyncContext();
-
- ctx.addListener(new AsyncListener() {
- @Override
- public void onComplete(AsyncEvent asyncEvent) throws IOException {
- subscribers.remove(sink);
- }
-
- @Override
- public void onTimeout(AsyncEvent asyncEvent) throws IOException {
- subscribers.remove(sink);
- }
-
- @Override
- public void onError(AsyncEvent asyncEvent) throws IOException {
- subscribers.remove(sink);
- }
-
- @Override
- public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
-
- }
- });
-
- subscribers.add(sink);
- }
-
- @Override
- public CompletionStage<?> broadcast(OutboundSseEvent event) {
- assertNotClosed();
-
- final Collection<CompletableFuture<?>> futures = new ArrayList<>();
- for (SseEventSink sink: subscribers) {
- try {
- futures.add(sink.send(event).toCompletableFuture());
- } catch (final Exception ex) {
- exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
- }
- }
-
- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
- }
-
- @Override
- public void onClose(Consumer<SseEventSink> subscriber) {
- assertNotClosed();
- closers.add(subscriber);
- }
-
- @Override
- public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
- assertNotClosed();
- exceptioners.add(exceptioner);
- }
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- subscribers.forEach(subscriber -> {
- subscriber.close();
- closers.forEach(closer -> closer.accept(subscriber));
- });
- }
- }
-
- private void assertNotClosed() {
- if (closed.get()) {
- throw new IllegalStateException("The SSE broadcaster is already closed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
deleted file mode 100644
index 6c4b695..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import javax.ws.rs.sse.Sse;
-
-import org.apache.cxf.jaxrs.ext.ContextProvider;
-import org.apache.cxf.message.Message;
-
-public class SseContextProvider implements ContextProvider<Sse> {
- @Override
- public Sse createContext(Message message) {
- return new SseImpl();
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
deleted file mode 100644
index 3240fe5..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.jaxrs.ext.ContextProvider;
-import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
-import org.apache.cxf.jaxrs.provider.ServerProviderFactory;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
-
-public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> {
-
- @Override
- public SseEventSink createContext(Message message) {
- final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
- if (request == null) {
- throw new IllegalStateException("Unable to retrieve HTTP request from the context");
- }
-
- final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
- ServerProviderFactory.getInstance(message), message.getExchange());
-
- final AsyncResponseImpl async = new AsyncResponseImpl(message);
- return new SseEventSinkImpl(writer, async, request.getAsyncContext());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
deleted file mode 100644
index cdcacb1..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import java.lang.annotation.Annotation;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
-import java.util.logging.Logger;
-
-import javax.servlet.AsyncContext;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.sse.OutboundSseEvent;
-import javax.ws.rs.sse.SseEventSink;
-
-import org.apache.cxf.common.logging.LogUtils;
-
-public class SseEventSinkImpl implements SseEventSink {
- private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {};
- private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
- private static final int BUFFER_SIZE = 10000; // buffering 10000 messages
-
- private final AsyncContext ctx;
- private final MessageBodyWriter<OutboundSseEvent> writer;
- private final Queue<QueuedEvent> buffer;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final AtomicBoolean dispatching = new AtomicBoolean(false);
-
- public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer,
- final AsyncResponse async, final AsyncContext ctx) {
-
- this.writer = writer;
- this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
- this.ctx = ctx;
-
- if (ctx == null) {
- throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. "
- + "Is the Servlet configured properly?");
- }
-
- ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS);
- }
-
- public AsyncContext getAsyncContext() {
- return ctx;
- }
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- // In case we are still dispatching, give the events the chance to be
- // sent over to the consumers. The good example would be sent(event) call,
- // immediately followed by the close() call.
- if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
- LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
- }
-
- try {
- ctx.complete();
- } catch (final IllegalStateException ex) {
- LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage());
- }
- }
- }
-
- private boolean awaitQueueToDrain(int timeout, TimeUnit unit) {
- final long parkTime = unit.toNanos(timeout) / 20;
- int attempt = 0;
-
- while (dispatching.get() && ++attempt < 20) {
- LockSupport.parkNanos(parkTime);
- }
-
- return buffer.isEmpty();
- }
-
- @Override
- public boolean isClosed() {
- return closed.get();
- }
-
- @Override
- public CompletionStage<?> send(OutboundSseEvent event) {
- final CompletableFuture<?> future = new CompletableFuture<>();
-
- if (!closed.get() && writer != null) {
- if (buffer.offer(new QueuedEvent(event, future))) {
- if (dispatching.compareAndSet(false, true)) {
- ctx.start(this::dequeue);
- }
- } else {
- future.completeExceptionally(new IllegalStateException(
- "The buffer is full (10000), unable to queue SSE event for send"));
- }
- } else {
- future.completeExceptionally(new IllegalStateException(
- "The sink is already closed, unable to queue SSE event for send"));
- }
-
- return future;
- }
-
- private void dequeue() {
- try {
- while (true) {
- final QueuedEvent qeuedEvent = buffer.poll();
-
- // Nothing queued, release the thread
- if (qeuedEvent == null) {
- break;
- }
-
- final OutboundSseEvent event = qeuedEvent.event;
- final CompletableFuture<?> future = qeuedEvent.completion;
-
- try {
- writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS,
- event.getMediaType(), null, ctx.getResponse().getOutputStream());
- ctx.getResponse().flushBuffer();
- future.complete(null);
- } catch (final Exception ex) {
- future.completeExceptionally(ex);
- }
- }
- } finally {
- dispatching.set(false);
- }
- }
-
- private static class QueuedEvent {
- private final OutboundSseEvent event;
- private final CompletableFuture<?> completion;
-
- QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) {
- this.event = event;
- this.completion = completion;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
deleted file mode 100644
index b558120..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse;
-
-import javax.ws.rs.sse.OutboundSseEvent.Builder;
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseBroadcaster;
-
-class SseImpl implements Sse {
- SseImpl() {
- }
-
- @Override
- public Builder newEventBuilder() {
- return new OutboundSseEventImpl.BuilderImpl();
- }
-
- @Override
- public SseBroadcaster newBroadcaster() {
- return new SseBroadcasterImpl();
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
deleted file mode 100644
index eb3a8d2..0000000
--- a/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.impl;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.ServiceUnavailableException;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.CompletionCallback;
-import javax.ws.rs.container.ConnectionCallback;
-import javax.ws.rs.container.TimeoutHandler;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-
-import org.apache.cxf.continuations.Continuation;
-import org.apache.cxf.continuations.ContinuationCallback;
-import org.apache.cxf.continuations.ContinuationProvider;
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.jaxrs.utils.HttpUtils;
-import org.apache.cxf.message.Message;
-
-
-public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
-
- private Continuation cont;
- private Message inMessage;
- private TimeoutHandler timeoutHandler;
- private volatile boolean initialSuspend;
- private volatile boolean cancelled;
- private volatile boolean done;
- private volatile boolean resumedByApplication;
- private volatile Long pendingTimeout;
-
- private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>();
- private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>();
- private Throwable unmappedThrowable;
-
- public AsyncResponseImpl(Message inMessage) {
- inMessage.put(AsyncResponse.class, this);
- inMessage.getExchange().put(ContinuationCallback.class, this);
- this.inMessage = inMessage;
-
- initContinuation();
- }
-
- @Override
- public boolean resume(Object response) {
- return doResume(response);
- }
-
- @Override
- public boolean resume(Throwable response) {
- return doResume(response);
- }
-
- private boolean isCancelledOrNotSuspended() {
- return isCancelled() || !isSuspended();
- }
-
- private boolean doResume(Object response) {
- if (isCancelledOrNotSuspended()) {
- return false;
- }
- return doResumeFinal(response);
- }
- private synchronized boolean doResumeFinal(Object response) {
- inMessage.getExchange().put(AsyncResponse.class, this);
- cont.setObject(response);
- resumedByApplication = true;
- if (!initialSuspend) {
- cont.resume();
- } else {
- initialSuspend = false;
- }
- return true;
- }
-
- @Override
- public boolean cancel() {
- return doCancel(null);
- }
-
- @Override
- public boolean cancel(int retryAfter) {
- return doCancel(Integer.toString(retryAfter));
- }
-
- @Override
- public boolean cancel(Date retryAfter) {
- return doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
- }
-
- private boolean doCancel(String retryAfterHeader) {
- if (cancelled) {
- return true;
- }
- if (!isSuspended()) {
- return false;
- }
-
- cancelled = true;
- ResponseBuilder rb = Response.status(503);
- if (retryAfterHeader != null) {
- rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
- }
- doResumeFinal(rb.build());
- return cancelled;
- }
-
- @Override
- public boolean isSuspended() {
- if (cancelled || resumedByApplication) {
- return false;
- }
- return initialSuspend || cont.isPending();
- }
-
- @Override
- public synchronized boolean isCancelled() {
- return cancelled;
- }
-
- @Override
- public boolean isDone() {
- return done;
- }
-
- @Override
- public synchronized boolean setTimeout(long time, TimeUnit unit) throws IllegalStateException {
- if (isCancelledOrNotSuspended()) {
- return false;
- }
- setAsyncResponseOnExchange();
- long timeout = TimeUnit.MILLISECONDS.convert(time, unit);
- initialSuspend = false;
- if (!cont.isPending()) {
- cont.suspend(timeout);
- } else {
- pendingTimeout = timeout;
- cont.resume();
- }
- return true;
- }
-
- private void setAsyncResponseOnExchange() {
- inMessage.getExchange().put(AsyncResponse.class, this);
- }
-
- @Override
- public void setTimeoutHandler(TimeoutHandler handler) {
- timeoutHandler = handler;
- }
-
- @Override
- public Collection<Class<?>> register(Class<?> callback) throws NullPointerException {
- return register(callback, new Class<?>[]{}).get(callback);
- }
-
- @Override
- public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks)
- throws NullPointerException {
- try {
- Object[] extraCallbacks = new Object[callbacks.length];
- for (int i = 0; i < callbacks.length; i++) {
- extraCallbacks[i] = callbacks[i].newInstance();
- }
- return register(callback.newInstance(), extraCallbacks);
- } catch (NullPointerException e) {
- throw e;
- } catch (Throwable t) {
- return Collections.emptyMap();
- }
-
- }
-
- @Override
- public Collection<Class<?>> register(Object callback) throws NullPointerException {
- return register(callback, new Object[]{}).get(callback.getClass());
- }
-
- @Override
- public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks)
- throws NullPointerException {
- Map<Class<?>, Collection<Class<?>>> map =
- new HashMap<Class<?>, Collection<Class<?>>>();
-
- Object[] allCallbacks = new Object[1 + callbacks.length];
- allCallbacks[0] = callback;
- System.arraycopy(callbacks, 0, allCallbacks, 1, callbacks.length);
-
- for (int i = 0; i < allCallbacks.length; i++) {
- if (allCallbacks[i] == null) {
- throw new NullPointerException();
- }
- Class<?> callbackCls = allCallbacks[i].getClass();
- Collection<Class<?>> knownCallbacks = map.get(callbackCls);
- if (knownCallbacks == null) {
- knownCallbacks = new HashSet<Class<?>>();
- map.put(callbackCls, knownCallbacks);
- }
-
- if (allCallbacks[i] instanceof CompletionCallback) {
- knownCallbacks.add(CompletionCallback.class);
- completionCallbacks.add((CompletionCallback)allCallbacks[i]);
- } else if (allCallbacks[i] instanceof ConnectionCallback) {
- knownCallbacks.add(ConnectionCallback.class);
- connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);
- }
- }
- return map;
- }
-
- @Override
- public void onComplete() {
- done = true;
- updateCompletionCallbacks(unmappedThrowable);
- }
-
- @Override
- public void onError(Throwable error) {
- updateCompletionCallbacks(error);
- }
-
- private void updateCompletionCallbacks(Throwable error) {
- Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
- for (CompletionCallback completionCallback : completionCallbacks) {
- completionCallback.onComplete(actualError);
- }
- }
-
- @Override
- public void onDisconnect() {
- for (ConnectionCallback connectionCallback : connectionCallbacks) {
- connectionCallback.onDisconnect(this);
- }
- }
-
- public synchronized boolean suspendContinuationIfNeeded() {
- if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) {
- cont.suspend(AsyncResponse.NO_TIMEOUT);
- initialSuspend = false;
- return true;
- }
- return false;
- }
-
- @SuppressWarnings("resource") // Response that is built here shouldn't be closed here
- public Object getResponseObject() {
- Object obj = cont.getObject();
- if (!(obj instanceof Response) && !(obj instanceof Throwable)) {
- if (obj == null) {
- obj = Response.noContent().build();
- } else {
- obj = Response.ok().entity(obj).build();
- }
- }
- return obj;
- }
-
- public boolean isResumedByApplication() {
- return resumedByApplication;
- }
-
- public synchronized void handleTimeout() {
- if (!resumedByApplication) {
- if (pendingTimeout != null) {
- setAsyncResponseOnExchange();
- cont.suspend(pendingTimeout);
- pendingTimeout = null;
- } else if (timeoutHandler != null) {
- timeoutHandler.handleTimeout(this);
- } else {
- cont.setObject(new ServiceUnavailableException());
- }
- }
- }
-
- private void initContinuation() {
- ContinuationProvider provider =
- (ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
- cont = provider.getContinuation();
- initialSuspend = true;
- }
-
- public void prepareContinuation() {
- initContinuation();
- }
-
- public void setUnmappedThrowable(Throwable t) {
- unmappedThrowable = t;
- }
- public void reset() {
- cont.reset();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/4c13daf7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bbb9537..51b77bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <cxf.version>3.2.4</cxf.version>
+ <cxf.version>3.2.5</cxf.version>
<bnd.version>4.0.0</bnd.version>
</properties>