You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/04/30 13:24:31 UTC
cxf git commit: Updating to JAX-RS API 2.1 milestone 7. Keeping the
NIO implementation but under CXF umbrella
Repository: cxf
Updated Branches:
refs/heads/master a36af6323 -> 09121b3ab
Updating to JAX-RS API 2.1 milestone 7. Keeping the NIO implementation but under CXF umbrella
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/09121b3a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/09121b3a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/09121b3a
Branch: refs/heads/master
Commit: 09121b3abf54697f576d4d067190ce696f2f0338
Parents: a36af63
Author: reta <dr...@gmail.com>
Authored: Sat Apr 29 19:12:26 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Sat Apr 29 21:34:08 2017 -0400
----------------------------------------------------------------------
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 2 +-
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 16 ++--
.../demo/jaxrs/sse/StatsRestServiceImpl.java | 16 ++--
parent/pom.xml | 2 +-
.../org/apache/cxf/jaxrs/impl/RequestImpl.java | 38 +-------
.../cxf/jaxrs/impl/ResponseBuilderImpl.java | 14 ---
.../cxf/jaxrs/impl/tl/ThreadLocalRequest.java | 24 -----
.../cxf/jaxrs/nio/DelegatingNioInputStream.java | 1 -
.../jaxrs/nio/DelegatingNioOutputStream.java | 2 -
.../cxf/jaxrs/nio/NioCompletionHandler.java | 25 +++++
.../apache/cxf/jaxrs/nio/NioErrorHandler.java | 30 ++++++
.../apache/cxf/jaxrs/nio/NioInputStream.java | 34 +++++++
.../apache/cxf/jaxrs/nio/NioOutputStream.java | 28 ++++++
.../org/apache/cxf/jaxrs/nio/NioReadEntity.java | 22 ++++-
.../apache/cxf/jaxrs/nio/NioReaderHandler.java | 35 +++++++
.../apache/cxf/jaxrs/nio/NioWriteEntity.java | 3 -
.../apache/cxf/jaxrs/nio/NioWriterHandler.java | 34 +++++++
.../cxf/jaxrs/provider/BinaryDataProvider.java | 4 +-
.../cxf/jaxrs/client/AsyncInvokerImpl.java | 19 ++++
.../client/CompletionStageRxInvokerImpl.java | 15 +++
.../cxf/jaxrs/client/SyncInvokerImpl.java | 15 +++
.../jaxrs/client/spec/ClientBuilderImpl.java | 11 +++
.../client/spec/InvocationBuilderImpl.java | 28 ++----
.../rx/client/ObservableRxInvokerImpl.java | 15 +++
.../cxf/jaxrs/sse/SseBroadcasterImpl.java | 50 +++++-----
.../cxf/jaxrs/sse/SseUnboundedSubscription.java | 72 --------------
.../atmosphere/SseAtmosphereEventSinkImpl.java | 56 +++++------
.../cxf/systest/jaxrs/nio/NioBookStore.java | 99 ++++++++++----------
.../apache/cxf/systest/jaxrs/sse/BookStore.java | 10 +-
.../cxf/systest/jaxrs/sse/BookStore2.java | 10 +-
30 files changed, 417 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 0aa943c..63eae32 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -64,7 +64,7 @@ public class StatsRestServiceImpl {
@Path("sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void stats(@Context SseEventSink sink) {
- broadcaster.subscribe(sink);
+ broadcaster.register(sink);
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index f69946f..28730a3 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -53,21 +53,21 @@ public class StatsRestServiceImpl {
public void run() {
try {
final Builder builder = sse.newEventBuilder();
- sink.onNext(createStatsEvent(builder.name("stats"), 1));
+ sink.send(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 2));
+ sink.send(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 3));
+ sink.send(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 4));
+ sink.send(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 5));
+ sink.send(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 6));
+ sink.send(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 7));
+ sink.send(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 8));
+ sink.send(createStatsEvent(builder.name("stats"), 8));
sink.close();
} catch (final Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 72ab7be..f2d8ff3 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -50,21 +50,21 @@ public class StatsRestServiceImpl {
public void run() {
try {
final Builder builder = sse.newEventBuilder();
- sink.onNext(createStatsEvent(builder.name("stats"), 1));
+ sink.send(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 2));
+ sink.send(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 3));
+ sink.send(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 4));
+ sink.send(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 5));
+ sink.send(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 6));
+ sink.send(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 7));
+ sink.send(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(1000);
- sink.onNext(createStatsEvent(builder.name("stats"), 8));
+ sink.send(createStatsEvent(builder.name("stats"), 8));
sink.close();
} catch (final Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d1f73fd..5b04d9b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -110,7 +110,7 @@
<cxf.geronimo.transaction.version>3.1.4</cxf.geronimo.transaction.version>
<cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
<cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
- <cxf.javax.ws.rs.version>2.1-m05</cxf.javax.ws.rs.version>
+ <cxf.javax.ws.rs.version>2.1-m07</cxf.javax.ws.rs.version>
<cxf.jaxb.version>2.2.11</cxf.jaxb.version>
<cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
<cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
index aa64910..bb99409 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
@@ -19,7 +19,6 @@
package org.apache.cxf.jaxrs.impl;
-import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
@@ -29,35 +28,28 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
-import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.EntityTag;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Variant;
import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.nio.NioReadEntity;
-import org.apache.cxf.jaxrs.nio.NioReadListenerImpl;
import org.apache.cxf.jaxrs.utils.HttpUtils;
import org.apache.cxf.jaxrs.utils.JAXRSUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.PhaseInterceptorChain;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
/**
* TODO : deal with InvalidStateExceptions
*
*/
-public class RequestImpl implements Request {
+public class RequestImpl implements Request {
private final Message m;
private final HttpHeaders headers;
@@ -390,32 +382,4 @@ public class RequestImpl implements Request {
return 0;
}
}
-
- @Override
- public void entity(NioReaderHandler reader) {
- entity(reader, in -> { }, throwable -> { });
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
- entity(reader, completion, throwable -> { });
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioErrorHandler error) {
- entity(reader, in -> { }, error);
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
- try {
- final HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
- if (request != null) {
- final NioReadEntity entity = new NioReadEntity(reader, completion, error);
- request.getInputStream().setReadListener(new NioReadListenerImpl(entity, request.getInputStream()));
- }
- } catch (final IOException ex) {
- throw new RuntimeException("Unable to initialize NIO entity", ex);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
index 6854c2b..0d87387 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
@@ -34,14 +34,11 @@ import javax.ws.rs.core.Link;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.NewCookie;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioWriterHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Variant;
-import org.apache.cxf.jaxrs.nio.NioWriteEntity;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.PhaseInterceptorChain;
@@ -317,15 +314,4 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable {
}
return variants(Arrays.asList(variants));
}
-
- @Override
- public ResponseBuilder entity(NioWriterHandler writerHandler) {
- return entity(writerHandler, (NioErrorHandler)null);
- }
-
- @Override
- public ResponseBuilder entity(NioWriterHandler writerHandler, NioErrorHandler errorHandler) {
- entity = new NioWriteEntity(writerHandler, errorHandler);
- return this;
- }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
index 20391d5..0ce54c9 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
@@ -23,9 +23,6 @@ import java.util.Date;
import java.util.List;
import javax.ws.rs.core.EntityTag;
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Variant;
@@ -56,25 +53,4 @@ public class ThreadLocalRequest extends AbstractThreadLocalProxy<Request>
public ResponseBuilder evaluatePreconditions() {
return get().evaluatePreconditions();
}
-
- @Override
- public void entity(NioReaderHandler reader) {
- get().entity(reader);
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
- get().entity(reader, completion);
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioErrorHandler error) {
- get().entity(reader, error);
- }
-
- @Override
- public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
- get().entity(reader, completion, error);
- }
-
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
index b2c5c32..65c97be 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
@@ -21,7 +21,6 @@ package org.apache.cxf.jaxrs.nio;
import java.io.IOException;
import javax.servlet.ServletInputStream;
-import javax.ws.rs.core.NioInputStream;
public class DelegatingNioInputStream extends NioInputStream {
private final ServletInputStream in;
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
index 18d95a1..ec066be 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
@@ -21,8 +21,6 @@ package org.apache.cxf.jaxrs.nio;
import java.io.IOException;
import java.io.OutputStream;
-import javax.ws.rs.core.NioOutputStream;
-
public class DelegatingNioOutputStream extends NioOutputStream {
private final OutputStream out;
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
new file mode 100644
index 0000000..534dca9
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+@FunctionalInterface
+public interface NioCompletionHandler {
+ void complete(NioInputStream in);
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
new file mode 100644
index 0000000..e7a538f
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+@FunctionalInterface
+public interface NioErrorHandler {
+ /**
+ * Method called when an exception or error occurred.
+ *
+ * @param throwable the error or exception encountered.
+ */
+ void error(Throwable throwable) throws Throwable;
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
new file mode 100644
index 0000000..f75b890
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+import java.io.InputStream;
+
+/**
+ * Class NioReader.
+ */
+public abstract class NioInputStream extends InputStream {
+ /**
+ * Checks if the the input stream has been consumed.
+ *
+ * @return outcome of test.
+ */
+ public abstract boolean isFinished();
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
new file mode 100644
index 0000000..3ac1fdf
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+import java.io.OutputStream;
+
+/**
+ * Class NioReader.
+ */
+public abstract class NioOutputStream extends OutputStream {
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
index 754be40..8804401 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
@@ -18,9 +18,13 @@
*/
package org.apache.cxf.jaxrs.nio;
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
public class NioReadEntity {
private final NioReaderHandler reader;
@@ -31,6 +35,18 @@ public class NioReadEntity {
this.reader = reader;
this.completion = completion;
this.error = error;
+
+ try {
+ final Message m = JAXRSUtils.getCurrentMessage();
+ if (m != null) {
+ final HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
+ if (request != null) {
+ request.getInputStream().setReadListener(new NioReadListenerImpl(this, request.getInputStream()));
+ }
+ }
+ } catch (final IOException ex) {
+ throw new RuntimeException("Unable to initialize NIO entity", ex);
+ }
}
public NioReaderHandler getReader() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
new file mode 100644
index 0000000..912684f
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+/**
+ * Class NioReader.
+ */
+@FunctionalInterface
+public interface NioReaderHandler {
+ /**
+ * Called every time it is possible to read from the input stream without blocking. The last
+ * time this method is called, the value of {@code in.isFinished()} must be {@code true} to
+ * indicate that all the stream has been read.
+ *
+ * @param in input stream.
+ */
+ void read(NioInputStream in);
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
index 2df0795..1f09f92 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
@@ -18,9 +18,6 @@
*/
package org.apache.cxf.jaxrs.nio;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioWriterHandler;
-
public final class NioWriteEntity {
private final NioWriterHandler writer;
private final NioErrorHandler error;
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
new file mode 100644
index 0000000..920b3a1
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+/**
+ * Class NioWriterHandler.
+ */
+@FunctionalInterface
+public interface NioWriterHandler {
+ /**
+ * Method called when it is possible to write some data without blocking.
+ *
+ * @param out output stream.
+ * @return {@code true} if there is more data to write, {@code false} otherwise.
+ */
+ boolean write(NioOutputStream out);
+}
+
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
index 0b8ad53..0abfd4b 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
@@ -42,8 +42,6 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.NioOutputStream;
-import javax.ws.rs.core.NioWriterHandler;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
@@ -57,8 +55,10 @@ import org.apache.cxf.helpers.FileUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.jaxrs.impl.HttpHeadersImpl;
import org.apache.cxf.jaxrs.nio.DelegatingNioOutputStream;
+import org.apache.cxf.jaxrs.nio.NioOutputStream;
import org.apache.cxf.jaxrs.nio.NioWriteEntity;
import org.apache.cxf.jaxrs.nio.NioWriteListenerImpl;
+import org.apache.cxf.jaxrs.nio.NioWriterHandler;
import org.apache.cxf.jaxrs.utils.AnnotationUtils;
import org.apache.cxf.jaxrs.utils.ExceptionUtils;
import org.apache.cxf.jaxrs.utils.JAXRSUtils;
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
index 9be13ba..b3a2e99 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
@@ -214,5 +214,24 @@ public class AsyncInvokerImpl implements AsyncInvoker {
null,
callback);
}
+ @Override
+ public Future<Response> patch(Entity<?> entity) {
+ return method(HttpMethod.PATCH, entity);
+ }
+
+ @Override
+ public <T> Future<T> patch(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> patch(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> patch(Entity<?> entity, InvocationCallback<T> callback) {
+ return method(HttpMethod.PATCH, entity, callback);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
index f60c45b..822948f 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
@@ -160,4 +160,19 @@ public class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
}
+ @Override
+ public CompletionStage<Response> patch(Entity<?> entity) {
+ return method(HttpMethod.PATCH, entity);
+ }
+
+ @Override
+ public <T> CompletionStage<T> patch(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> patch(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
index 78ebd37..57b57e4 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
@@ -158,4 +158,19 @@ public class SyncInvokerImpl implements SyncInvoker {
public WebClient getWebClient() {
return wc;
}
+
+ @Override
+ public Response patch(Entity<?> entity) {
+ return method(HttpMethod.PATCH, entity);
+ }
+
+ @Override
+ public <T> T patch(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
+ @Override
+ public <T> T patch(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
index 79a968e..d2befac 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
@@ -20,6 +20,8 @@ package org.apache.cxf.jaxrs.client.spec;
import java.security.KeyStore;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.KeyManagerFactory;
@@ -148,4 +150,13 @@ public class ClientBuilderImpl extends ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder executorService(ExecutorService executorService) {
+ return configImpl.property("executorService", executorService);
+ }
+
+ @Override
+ public ClientBuilder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ return configImpl.property("scheduledExecutorService", scheduledExecutorService);
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index 68f4be6..62219b5 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -32,7 +32,6 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
-import javax.ws.rs.client.NioInvoker;
import javax.ws.rs.client.RxInvoker;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.CacheControl;
@@ -377,34 +376,27 @@ public class InvocationBuilderImpl implements Invocation.Builder {
@Override
public CompletionStageRxInvoker rx() {
- return rx((ExecutorService)null);
+ return webClient.rx((ExecutorService)null);
}
+ @SuppressWarnings("rawtypes")
@Override
- public CompletionStageRxInvoker rx(ExecutorService executorService) {
- // TODO: At the moment we still delegate if possible to the async HTTP conduit.
- // Investigate if letting the CompletableFuture thread pool deal with the sync invocation
- // is indeed more effective
-
- return webClient.rx(executorService);
+ public <T extends RxInvoker> T rx(Class<T> rxCls) {
+ return webClient.rx(rxCls, (ExecutorService)null);
}
-
- @SuppressWarnings("rawtypes")
@Override
- public <T extends RxInvoker> T rx(Class<T> rxCls) {
- return rx(rxCls, (ExecutorService)null);
+ public Response patch(Entity<?> entity) {
+ return sync.patch(entity);
}
- @SuppressWarnings("rawtypes")
@Override
- public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
- return webClient.rx(rxCls, executorService);
+ public <T> T patch(Entity<?> entity, Class<T> responseType) {
+ return sync.patch(entity, responseType);
}
@Override
- public NioInvoker nio() {
- // TODO: Implementation required (JAX-RS 2.1)
- return null;
+ public <T> T patch(Entity<?> entity, GenericType<T> responseType) {
+ return sync.patch(entity, responseType);
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
index 8088f85..0a6033c 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
@@ -171,4 +171,19 @@ public class ObservableRxInvokerImpl implements ObservableRxInvoker {
wc.prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb);
return cb.getObservable();
}
+
+ @Override
+ public Observable<Response> patch(Entity<?> entity) {
+ return method(HttpMethod.PATCH, entity);
+ }
+
+ @Override
+ public <T> Observable<T> patch(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> patch(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PATCH, entity, responseType);
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 8a24369..3884254 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,68 +18,62 @@
*/
package org.apache.cxf.jaxrs.sse;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import javax.ws.rs.Flow;
-import javax.ws.rs.Flow.Subscriber;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
public class SseBroadcasterImpl implements SseBroadcaster {
- private final Map<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> subscribers =
- new ConcurrentHashMap<>();
+ private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
- private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers =
+ private final Set<Consumer<SseEventSink>> closers =
new CopyOnWriteArraySet<>();
- private final Set<BiConsumer<Subscriber<? super OutboundSseEvent>, Throwable>> exceptioners =
+ private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
new CopyOnWriteArraySet<>();
@Override
- public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
- try {
- if (!subscribers.containsKey(subscriber)) {
- final SseUnboundedSubscription subscription = new SseUnboundedSubscription(subscriber);
- if (subscribers.putIfAbsent(subscriber, subscription) == null) {
- subscriber.onSubscribe(subscription);
- }
- }
- } catch (final Exception ex) {
- subscriber.onError(ex);
- }
+ public void register(SseEventSink sink) {
+ subscribers.add(sink);
}
@Override
- public void broadcast(OutboundSseEvent event) {
- for (Map.Entry<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> entry
- : subscribers.entrySet()) {
+ public CompletionStage<?> broadcast(OutboundSseEvent event) {
+ final Collection<CompletableFuture<?>> futures = new ArrayList<>();
+
+ for (SseEventSink sink: subscribers) {
try {
- entry.getValue().send(event);
+ futures.add(sink.send(event).toCompletableFuture());
} catch (final Exception ex) {
- exceptioners.forEach(exceptioner -> exceptioner.accept(entry.getKey(), ex));
+ exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
}
}
+
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}
@Override
- public void onClose(Consumer<Subscriber<? super OutboundSseEvent>> subscriber) {
+ public void onClose(Consumer<SseEventSink> subscriber) {
closers.add(subscriber);
}
@Override
- public void onError(BiConsumer<Subscriber<? super OutboundSseEvent>, Throwable> exceptioner) {
+ public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
exceptioners.add(exceptioner);
}
@Override
public void close() {
- subscribers.keySet().forEach(subscriber -> {
- subscriber.onComplete();
+ subscribers.forEach(subscriber -> {
+ subscriber.close();
closers.forEach(closer -> closer.accept(subscriber));
});
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
deleted file mode 100644
index 92f24ab..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.ws.rs.Flow;
-import javax.ws.rs.Flow.Subscription;
-import javax.ws.rs.sse.OutboundSseEvent;
-
-class SseUnboundedSubscription implements Subscription {
- // Has subscription been cancelled or not?
- private boolean cancelled;
- // Current demand: what has been requested but not yet delivered
- private long demand;
- private final BlockingQueue<OutboundSseEvent> buffer = new LinkedBlockingQueue<>();
- private final Flow.Subscriber<? super OutboundSseEvent> subscriber;
-
- SseUnboundedSubscription(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
- this.subscriber = subscriber;
- }
-
- public void request(long n) {
- if (demand + n < 1) {
- // Effectively unbounded demand
- demand = Long.MAX_VALUE;
- send();
- } else {
- // Here we record the downstream demand
- demand += n;
- send();
- }
- }
-
- @Override
- public void cancel() {
- cancelled = true;
- }
-
- public void send(OutboundSseEvent event) throws InterruptedException {
- if (!cancelled && buffer.offer(event)) {
- send();
- }
- }
-
- private void send() {
- while (!cancelled && demand > 0 && !buffer.isEmpty()) {
- final OutboundSseEvent event = buffer.poll();
- if (event != null) {
- subscriber.onNext(event);
- --demand;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
index 268e9d2..a79eff7 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -22,13 +22,13 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
-import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
@@ -97,7 +97,9 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
}
@Override
- public void onNext(OutboundSseEvent event) {
+ public CompletionStage<?> send(OutboundSseEvent event) {
+ final CompletableFuture<?> future = new CompletableFuture<>();
+
if (!closed && writer != null) {
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
@@ -105,41 +107,27 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
// Atmosphere broadcasts asynchronously which is acceptable in most cases.
// Unfortunately, calling close() may lead to response stream being closed
// while there are still some SSE delivery scheduled.
- final Future<Object> future = resource
- .getBroadcaster()
- .broadcast(os.toString(StandardCharsets.UTF_8.name()));
-
- try {
- if (!future.isDone()) {
- // Let us wait at least 200 milliseconds before returning to ensure
- // that SSE had the opportunity to be delivered.
- LOG.fine("Waiting 200ms to ensure SSE Atmosphere response is delivered");
- future.get(200, TimeUnit.MILLISECONDS);
- }
- } catch (final ExecutionException | InterruptedException ex) {
- throw new IOException(ex);
- } catch (final TimeoutException ex) {
- LOG.warning("SSE Atmosphere response was not delivered within default timeout");
- }
+ return CompletableFuture.completedFuture(
+ resource
+ .getBroadcaster()
+ .broadcast(os.toString(StandardCharsets.UTF_8.name()))
+ .get(1, TimeUnit.SECONDS)
+ );
} catch (final IOException ex) {
LOG.warning("While writing the SSE event, an exception was raised: " + ex);
+ future.completeExceptionally(ex);
+ } catch (final ExecutionException | InterruptedException ex) {
+ LOG.warning("SSE Atmosphere response was not delivered");
+ future.completeExceptionally(ex);
+ } catch (final TimeoutException ex) {
+ LOG.warning("SSE Atmosphere response was not delivered within default timeout");
+ future.completeExceptionally(ex);
}
+ } else {
+ future.complete(null);
}
- }
-
- @Override
- public void onError(Throwable throwable) {
- // TODO: Should we close the response?
- }
-
- @Override
- public void onComplete() {
- close();
- }
-
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
+
+ return future;
}
@Override
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
index 62dda23..1498653 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
@@ -39,6 +39,8 @@ import javax.ws.rs.core.Response;
import org.apache.cxf.annotations.UseNio;
import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.nio.NioReadEntity;
+import org.apache.cxf.jaxrs.nio.NioWriteEntity;
@Path("/bookstore")
public class NioBookStore {
@@ -49,31 +51,35 @@ public class NioBookStore {
IOUtils.readBytesFromStream(getClass().getResourceAsStream("/files/books.txt")));
final byte[] buffer = new byte[4096];
- return Response.ok().entity(
- out -> {
- try {
- final int n = in.read(buffer);
-
- if (n >= 0) {
- out.write(buffer, 0, n);
- return true;
- }
-
+ return Response
+ .ok()
+ .entity(
+ new NioWriteEntity(
+ out -> {
try {
- in.close();
+ final int n = in.read(buffer);
+
+ if (n >= 0) {
+ out.write(buffer, 0, n);
+ return true;
+ }
+
+ try {
+ in.close();
+ } catch (IOException ex) {
+ /* do nothing */
+ }
+
+ return false;
} catch (IOException ex) {
- /* do nothing */
+ throw new WebApplicationException(ex);
}
-
- return false;
- } catch (IOException ex) {
- throw new WebApplicationException(ex);
+ },
+ throwable -> {
+ throw throwable;
}
- },
- throwable -> {
- throw throwable;
- }
- ).build();
+ ))
+ .build();
}
@GET
@@ -92,34 +98,33 @@ public class NioBookStore {
final byte[] buffer = new byte[4096];
final LongAdder adder = new LongAdder();
- request.entity(
- in -> {
- try {
- final int n = in.read(buffer);
- if (n > 0) {
- adder.add(n);
- out.write(buffer, 0, n);
- }
- } catch (IOException e) {
- throw new WebApplicationException(e);
+ new NioReadEntity(
+ in -> {
+ try {
+ final int n = in.read(buffer);
+ if (n > 0) {
+ adder.add(n);
+ out.write(buffer, 0, n);
}
- },
- in -> {
- try {
- if (!in.isFinished()) {
- throw new IllegalStateException("Reader did not finish yet");
- }
-
- out.close();
- response.resume("Book Store uploaded: " + adder.longValue() + " bytes");
- } catch (IOException e) {
- throw new WebApplicationException(e);
+ } catch (IOException e) {
+ throw new WebApplicationException(e);
+ }
+ },
+ in -> {
+ try {
+ if (!in.isFinished()) {
+ throw new IllegalStateException("Reader did not finish yet");
}
- },
- throwable -> { // error handler
- System.out.println("Problem found: " + throwable.getMessage());
- throw throwable;
+
+ out.close();
+ response.resume("Book Store uploaded: " + adder.longValue() + " bytes");
+ } catch (IOException e) {
+ throw new WebApplicationException(e);
}
- );
+ },
+ throwable -> { // error handler
+ System.out.println("Problem found: " + throwable.getMessage());
+ throw throwable;
+ });
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index b336cc9..c096713 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -77,13 +77,13 @@ public class BookStore {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.onNext(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createStatsEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createStatsEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createStatsEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createStatsEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -98,7 +98,7 @@ public class BookStore {
@Produces(MediaType.SERVER_SENT_EVENTS)
public void broadcast(@Context SseEventSink sink) {
try {
- broadcaster.subscribe(sink);
+ broadcaster.register(sink);
} finally {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index b14e86a..a5eeb8e 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -76,13 +76,13 @@ public class BookStore2 {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();
- sink.onNext(createStatsEvent(builder.name("book"), id + 1));
+ sink.send(createStatsEvent(builder.name("book"), id + 1));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 2));
+ sink.send(createStatsEvent(builder.name("book"), id + 2));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 3));
+ sink.send(createStatsEvent(builder.name("book"), id + 3));
Thread.sleep(200);
- sink.onNext(createStatsEvent(builder.name("book"), id + 4));
+ sink.send(createStatsEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
@@ -97,7 +97,7 @@ public class BookStore2 {
@Produces(MediaType.SERVER_SENT_EVENTS)
public void broadcast(@Context SseEventSink sink) {
try {
- broadcaster.subscribe(sink);
+ broadcaster.register(sink);
} finally {
latch.countDown();
}