You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2017/04/30 13:24:31 UTC

cxf git commit: Updating to JAX-RS API 2.1 milestone 7. Keeping the NIO implementation but under CXF umbrella

Repository: cxf
Updated Branches:
  refs/heads/master a36af6323 -> 09121b3ab


Updating to JAX-RS API 2.1 milestone 7. Keeping the NIO implementation but under CXF umbrella


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/09121b3a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/09121b3a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/09121b3a

Branch: refs/heads/master
Commit: 09121b3abf54697f576d4d067190ce696f2f0338
Parents: a36af63
Author: reta <dr...@gmail.com>
Authored: Sat Apr 29 19:12:26 2017 -0400
Committer: reta <dr...@gmail.com>
Committed: Sat Apr 29 21:34:08 2017 -0400

----------------------------------------------------------------------
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    |  2 +-
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    | 16 ++--
 .../demo/jaxrs/sse/StatsRestServiceImpl.java    | 16 ++--
 parent/pom.xml                                  |  2 +-
 .../org/apache/cxf/jaxrs/impl/RequestImpl.java  | 38 +-------
 .../cxf/jaxrs/impl/ResponseBuilderImpl.java     | 14 ---
 .../cxf/jaxrs/impl/tl/ThreadLocalRequest.java   | 24 -----
 .../cxf/jaxrs/nio/DelegatingNioInputStream.java |  1 -
 .../jaxrs/nio/DelegatingNioOutputStream.java    |  2 -
 .../cxf/jaxrs/nio/NioCompletionHandler.java     | 25 +++++
 .../apache/cxf/jaxrs/nio/NioErrorHandler.java   | 30 ++++++
 .../apache/cxf/jaxrs/nio/NioInputStream.java    | 34 +++++++
 .../apache/cxf/jaxrs/nio/NioOutputStream.java   | 28 ++++++
 .../org/apache/cxf/jaxrs/nio/NioReadEntity.java | 22 ++++-
 .../apache/cxf/jaxrs/nio/NioReaderHandler.java  | 35 +++++++
 .../apache/cxf/jaxrs/nio/NioWriteEntity.java    |  3 -
 .../apache/cxf/jaxrs/nio/NioWriterHandler.java  | 34 +++++++
 .../cxf/jaxrs/provider/BinaryDataProvider.java  |  4 +-
 .../cxf/jaxrs/client/AsyncInvokerImpl.java      | 19 ++++
 .../client/CompletionStageRxInvokerImpl.java    | 15 +++
 .../cxf/jaxrs/client/SyncInvokerImpl.java       | 15 +++
 .../jaxrs/client/spec/ClientBuilderImpl.java    | 11 +++
 .../client/spec/InvocationBuilderImpl.java      | 28 ++----
 .../rx/client/ObservableRxInvokerImpl.java      | 15 +++
 .../cxf/jaxrs/sse/SseBroadcasterImpl.java       | 50 +++++-----
 .../cxf/jaxrs/sse/SseUnboundedSubscription.java | 72 --------------
 .../atmosphere/SseAtmosphereEventSinkImpl.java  | 56 +++++------
 .../cxf/systest/jaxrs/nio/NioBookStore.java     | 99 ++++++++++----------
 .../apache/cxf/systest/jaxrs/sse/BookStore.java | 10 +-
 .../cxf/systest/jaxrs/sse/BookStore2.java       | 10 +-
 30 files changed, 417 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 0aa943c..63eae32 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_cdi/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -64,7 +64,7 @@ public class StatsRestServiceImpl {
     @Path("sse")
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void stats(@Context SseEventSink sink) {
-        broadcaster.subscribe(sink);
+        broadcaster.register(sink);
     }
 
     private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index f69946f..28730a3 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_spring/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -53,21 +53,21 @@ public class StatsRestServiceImpl {
             public void run() {
                 try {
                     final Builder builder = sse.newEventBuilder();
-                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
+                    sink.send(createStatsEvent(builder.name("stats"), 1));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
+                    sink.send(createStatsEvent(builder.name("stats"), 2));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
+                    sink.send(createStatsEvent(builder.name("stats"), 3));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
+                    sink.send(createStatsEvent(builder.name("stats"), 4));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
+                    sink.send(createStatsEvent(builder.name("stats"), 5));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
+                    sink.send(createStatsEvent(builder.name("stats"), 6));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
+                    sink.send(createStatsEvent(builder.name("stats"), 7));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
+                    sink.send(createStatsEvent(builder.name("stats"), 8));
                     sink.close();
                 } catch (final Exception e) {
                     e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
index 72ab7be..f2d8ff3 100644
--- a/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
+++ b/distribution/src/main/release/samples/jax_rs/sse_tomcat/src/main/java/demo/jaxrs/sse/StatsRestServiceImpl.java
@@ -50,21 +50,21 @@ public class StatsRestServiceImpl {
             public void run() {
                 try {
                     final Builder builder = sse.newEventBuilder();
-                    sink.onNext(createStatsEvent(builder.name("stats"), 1));
+                    sink.send(createStatsEvent(builder.name("stats"), 1));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 2));
+                    sink.send(createStatsEvent(builder.name("stats"), 2));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 3));
+                    sink.send(createStatsEvent(builder.name("stats"), 3));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 4));
+                    sink.send(createStatsEvent(builder.name("stats"), 4));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 5));
+                    sink.send(createStatsEvent(builder.name("stats"), 5));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 6));
+                    sink.send(createStatsEvent(builder.name("stats"), 6));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 7));
+                    sink.send(createStatsEvent(builder.name("stats"), 7));
                     Thread.sleep(1000);
-                    sink.onNext(createStatsEvent(builder.name("stats"), 8));
+                    sink.send(createStatsEvent(builder.name("stats"), 8));
                     sink.close();
                 } catch (final Exception e) {
                     e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d1f73fd..5b04d9b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -110,7 +110,7 @@
         <cxf.geronimo.transaction.version>3.1.4</cxf.geronimo.transaction.version>
         <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
         <cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
-        <cxf.javax.ws.rs.version>2.1-m05</cxf.javax.ws.rs.version>
+        <cxf.javax.ws.rs.version>2.1-m07</cxf.javax.ws.rs.version>
         <cxf.jaxb.version>2.2.11</cxf.jaxb.version>
         <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
         <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
index aa64910..bb99409 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.jaxrs.impl;
 
-import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Collections;
@@ -29,35 +28,28 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 
-import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.EntityTag;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Variant;
 
 import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.jaxrs.nio.NioReadEntity;
-import org.apache.cxf.jaxrs.nio.NioReadListenerImpl;
 import org.apache.cxf.jaxrs.utils.HttpUtils;
 import org.apache.cxf.jaxrs.utils.JAXRSUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.PhaseInterceptorChain;
-import org.apache.cxf.transport.http.AbstractHTTPDestination;
 
 /**
  * TODO : deal with InvalidStateExceptions
  *
  */
 
-public class RequestImpl implements Request {
 
+public class RequestImpl implements Request {
     private final Message m;
     private final HttpHeaders headers;
 
@@ -390,32 +382,4 @@ public class RequestImpl implements Request {
             return 0;
         }
     }
-
-    @Override
-    public void entity(NioReaderHandler reader) {
-        entity(reader, in -> { }, throwable -> { });
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
-        entity(reader, completion, throwable -> { });
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioErrorHandler error) {
-        entity(reader, in -> { }, error);
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
-        try {
-            final HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
-            if (request != null) {
-                final NioReadEntity entity = new NioReadEntity(reader, completion, error);
-                request.getInputStream().setReadListener(new NioReadListenerImpl(entity, request.getInputStream()));
-            }
-        } catch (final IOException ex) {
-            throw new RuntimeException("Unable to initialize NIO entity", ex);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
index 6854c2b..0d87387 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java
@@ -34,14 +34,11 @@ import javax.ws.rs.core.Link;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.NewCookie;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioWriterHandler;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
 import javax.ws.rs.core.Variant;
 
-import org.apache.cxf.jaxrs.nio.NioWriteEntity;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.PhaseInterceptorChain;
 
@@ -317,15 +314,4 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable {
         }
         return variants(Arrays.asList(variants));
     }
-
-    @Override
-    public ResponseBuilder entity(NioWriterHandler writerHandler) {
-        return entity(writerHandler, (NioErrorHandler)null);
-    }
-
-    @Override
-    public ResponseBuilder entity(NioWriterHandler writerHandler, NioErrorHandler errorHandler) {
-        entity = new NioWriteEntity(writerHandler, errorHandler);
-        return this;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
index 20391d5..0ce54c9 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/tl/ThreadLocalRequest.java
@@ -23,9 +23,6 @@ import java.util.Date;
 import java.util.List;
 
 import javax.ws.rs.core.EntityTag;
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Variant;
@@ -56,25 +53,4 @@ public class ThreadLocalRequest extends AbstractThreadLocalProxy<Request>
     public ResponseBuilder evaluatePreconditions() {
         return get().evaluatePreconditions();
     }
-
-    @Override
-    public void entity(NioReaderHandler reader) {
-        get().entity(reader);
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioCompletionHandler completion) {
-        get().entity(reader, completion);
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioErrorHandler error) {
-        get().entity(reader, error);
-    }
-
-    @Override
-    public void entity(NioReaderHandler reader, NioCompletionHandler completion, NioErrorHandler error) {
-        get().entity(reader, completion, error);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
index b2c5c32..65c97be 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioInputStream.java
@@ -21,7 +21,6 @@ package org.apache.cxf.jaxrs.nio;
 import java.io.IOException;
 
 import javax.servlet.ServletInputStream;
-import javax.ws.rs.core.NioInputStream;
 
 public class DelegatingNioInputStream extends NioInputStream {
     private final ServletInputStream in;

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
index 18d95a1..ec066be 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java
@@ -21,8 +21,6 @@ package org.apache.cxf.jaxrs.nio;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import javax.ws.rs.core.NioOutputStream;
-
 public class DelegatingNioOutputStream extends NioOutputStream {
     private final OutputStream out;
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
new file mode 100644
index 0000000..534dca9
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioCompletionHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+@FunctionalInterface
+public interface NioCompletionHandler {
+    void complete(NioInputStream in);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
new file mode 100644
index 0000000..e7a538f
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioErrorHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+@FunctionalInterface
+public interface NioErrorHandler {
+    /**
+     * Method called when an exception or error occurred.
+     *
+     * @param throwable the error or exception encountered.
+     */
+    void error(Throwable throwable) throws Throwable;
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
new file mode 100644
index 0000000..f75b890
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioInputStream.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+import java.io.InputStream;
+
+/**
+ * Class NioReader.
+ */
+public abstract class NioInputStream extends InputStream {
+    /**
+     * Checks if the the input stream has been consumed.
+     *
+     * @return outcome of test.
+     */
+    public abstract boolean isFinished();
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
new file mode 100644
index 0000000..3ac1fdf
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioOutputStream.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+import java.io.OutputStream;
+
+/**
+ * Class NioReader.
+ */
+public abstract class NioOutputStream extends OutputStream {
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
index 754be40..8804401 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadEntity.java
@@ -18,9 +18,13 @@
  */
 package org.apache.cxf.jaxrs.nio;
 
-import javax.ws.rs.core.NioCompletionHandler;
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioReaderHandler;
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.AbstractHTTPDestination;
 
 public class NioReadEntity {
     private final NioReaderHandler reader;
@@ -31,6 +35,18 @@ public class NioReadEntity {
         this.reader = reader;
         this.completion = completion;
         this.error = error;
+        
+        try {
+            final Message m = JAXRSUtils.getCurrentMessage();
+            if (m != null) {
+                final HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
+                if (request != null) {
+                    request.getInputStream().setReadListener(new NioReadListenerImpl(this, request.getInputStream()));
+                }
+            }
+        } catch (final IOException ex) {
+            throw new RuntimeException("Unable to initialize NIO entity", ex);
+        }
     }
 
     public NioReaderHandler getReader() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
new file mode 100644
index 0000000..912684f
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReaderHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+/**
+ * Class NioReader.
+ */
+@FunctionalInterface
+public interface NioReaderHandler {
+    /**
+     * Called every time it is possible to read from the input stream without blocking. The last
+     * time this method is called, the value of {@code in.isFinished()} must be {@code true} to
+     * indicate that all the stream has been read.
+     *
+     * @param in input stream.
+     */
+    void read(NioInputStream in);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
index 2df0795..1f09f92 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java
@@ -18,9 +18,6 @@
  */
 package org.apache.cxf.jaxrs.nio;
 
-import javax.ws.rs.core.NioErrorHandler;
-import javax.ws.rs.core.NioWriterHandler;
-
 public final class NioWriteEntity {
     private final NioWriterHandler writer;
     private final NioErrorHandler error;

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
new file mode 100644
index 0000000..920b3a1
--- /dev/null
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriterHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.nio;
+
+/**
+ * Class NioWriterHandler.
+ */
+@FunctionalInterface
+public interface NioWriterHandler {
+    /**
+     * Method called when it is possible to write some data without blocking.
+     *
+     * @param out output stream.
+     * @return {@code true} if there is more data to write, {@code false} otherwise.
+     */
+    boolean write(NioOutputStream out);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
----------------------------------------------------------------------
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
index 0b8ad53..0abfd4b 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java
@@ -42,8 +42,6 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.NioOutputStream;
-import javax.ws.rs.core.NioWriterHandler;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.ext.MessageBodyReader;
 import javax.ws.rs.ext.MessageBodyWriter;
@@ -57,8 +55,10 @@ import org.apache.cxf.helpers.FileUtils;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxrs.impl.HttpHeadersImpl;
 import org.apache.cxf.jaxrs.nio.DelegatingNioOutputStream;
+import org.apache.cxf.jaxrs.nio.NioOutputStream;
 import org.apache.cxf.jaxrs.nio.NioWriteEntity;
 import org.apache.cxf.jaxrs.nio.NioWriteListenerImpl;
+import org.apache.cxf.jaxrs.nio.NioWriterHandler;
 import org.apache.cxf.jaxrs.utils.AnnotationUtils;
 import org.apache.cxf.jaxrs.utils.ExceptionUtils;
 import org.apache.cxf.jaxrs.utils.JAXRSUtils;

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
index 9be13ba..b3a2e99 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
@@ -214,5 +214,24 @@ public class AsyncInvokerImpl implements AsyncInvoker {
                                      null,
                                      callback);
     }
+    @Override
+    public Future<Response> patch(Entity<?> entity) {
+        return method(HttpMethod.PATCH, entity);
+    }
+    
+    @Override
+    public <T> Future<T> patch(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+    
+    @Override
+    public <T> Future<T> patch(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+ 
+    @Override
+    public <T> Future<T> patch(Entity<?> entity, InvocationCallback<T> callback) {
+        return method(HttpMethod.PATCH, entity, callback);
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
index f60c45b..822948f 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
@@ -160,4 +160,19 @@ public class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
         return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
     }
 
+    @Override
+    public CompletionStage<Response> patch(Entity<?> entity) {
+        return method(HttpMethod.PATCH, entity);
+    }
+
+    @Override
+    public <T> CompletionStage<T> patch(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> patch(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
index 78ebd37..57b57e4 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
@@ -158,4 +158,19 @@ public class SyncInvokerImpl implements SyncInvoker {
     public WebClient getWebClient() {
         return wc;
     }
+
+    @Override
+    public Response patch(Entity<?> entity) {
+        return method(HttpMethod.PATCH, entity);
+    }
+
+    @Override
+    public <T> T patch(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+
+    @Override
+    public <T> T patch(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
index 79a968e..d2befac 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientBuilderImpl.java
@@ -20,6 +20,8 @@ package org.apache.cxf.jaxrs.client.spec;
 
 import java.security.KeyStore;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.KeyManagerFactory;
@@ -148,4 +150,13 @@ public class ClientBuilderImpl extends ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder executorService(ExecutorService executorService) {
+        return configImpl.property("executorService", executorService);
+    }
+
+    @Override
+    public ClientBuilder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        return configImpl.property("scheduledExecutorService", scheduledExecutorService);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index 68f4be6..62219b5 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -32,7 +32,6 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.Invocation;
 import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.client.InvocationCallback;
-import javax.ws.rs.client.NioInvoker;
 import javax.ws.rs.client.RxInvoker;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.CacheControl;
@@ -377,34 +376,27 @@ public class InvocationBuilderImpl implements Invocation.Builder {
 
     @Override
     public CompletionStageRxInvoker rx() {
-        return rx((ExecutorService)null);
+        return webClient.rx((ExecutorService)null);
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
-    public CompletionStageRxInvoker rx(ExecutorService executorService) {
-        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
-        // Investigate if letting the CompletableFuture thread pool deal with the sync invocation
-        // is indeed more effective
-
-        return webClient.rx(executorService);
+    public <T extends RxInvoker> T rx(Class<T> rxCls) {
+        return webClient.rx(rxCls, (ExecutorService)null);
     }
 
-
-    @SuppressWarnings("rawtypes")
     @Override
-    public <T extends RxInvoker> T rx(Class<T> rxCls) {
-        return rx(rxCls, (ExecutorService)null);
+    public Response patch(Entity<?> entity) {
+        return sync.patch(entity);
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
-        return webClient.rx(rxCls, executorService);
+    public <T> T patch(Entity<?> entity, Class<T> responseType) {
+        return sync.patch(entity, responseType);
     }
 
     @Override
-    public NioInvoker nio() {
-        // TODO: Implementation required (JAX-RS 2.1)
-        return null;
+    public <T> T patch(Entity<?> entity, GenericType<T> responseType) {
+        return sync.patch(entity, responseType);
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
index 8088f85..0a6033c 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
@@ -171,4 +171,19 @@ public class ObservableRxInvokerImpl implements ObservableRxInvoker {
         wc.prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb);
         return cb.getObservable();
     }
+
+    @Override
+    public Observable<Response> patch(Entity<?> entity) {
+        return method(HttpMethod.PATCH, entity);
+    }
+
+    @Override
+    public <T> Observable<T> patch(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> patch(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PATCH, entity, responseType);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
index 8a24369..3884254 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseBroadcasterImpl.java
@@ -18,68 +18,62 @@
  */
 package org.apache.cxf.jaxrs.sse;
 
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
-import javax.ws.rs.Flow;
-import javax.ws.rs.Flow.Subscriber;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseBroadcaster;
+import javax.ws.rs.sse.SseEventSink;
 
 public class SseBroadcasterImpl implements SseBroadcaster {
-    private final Map<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> subscribers =
-            new ConcurrentHashMap<>();
+    private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
 
-    private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> closers =
+    private final Set<Consumer<SseEventSink>> closers =
             new CopyOnWriteArraySet<>();
 
-    private final Set<BiConsumer<Subscriber<? super OutboundSseEvent>, Throwable>> exceptioners =
+    private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
             new CopyOnWriteArraySet<>();
 
     @Override
-    public void subscribe(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
-        try {
-            if (!subscribers.containsKey(subscriber)) {
-                final SseUnboundedSubscription subscription = new SseUnboundedSubscription(subscriber);
-                if (subscribers.putIfAbsent(subscriber, subscription) == null) {
-                    subscriber.onSubscribe(subscription);
-                }
-            }
-        } catch (final Exception ex) {
-            subscriber.onError(ex);
-        }
+    public void register(SseEventSink sink) {
+        subscribers.add(sink);
     }
 
     @Override
-    public void broadcast(OutboundSseEvent event) {
-        for (Map.Entry<Flow.Subscriber<? super OutboundSseEvent>, SseUnboundedSubscription> entry 
-            : subscribers.entrySet()) {
+    public CompletionStage<?> broadcast(OutboundSseEvent event) {
+        final Collection<CompletableFuture<?>> futures = new ArrayList<>();
+        
+        for (SseEventSink sink: subscribers) {
             try {
-                entry.getValue().send(event);
+                futures.add(sink.send(event).toCompletableFuture());
             } catch (final Exception ex) {
-                exceptioners.forEach(exceptioner -> exceptioner.accept(entry.getKey(), ex));
+                exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
             }
         }
+        
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
     }
 
     @Override
-    public void onClose(Consumer<Subscriber<? super OutboundSseEvent>> subscriber) {
+    public void onClose(Consumer<SseEventSink> subscriber) {
         closers.add(subscriber);
     }
 
     @Override
-    public void onError(BiConsumer<Subscriber<? super OutboundSseEvent>, Throwable> exceptioner) {
+    public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
         exceptioners.add(exceptioner);
     }
 
     @Override
     public void close() {
-        subscribers.keySet().forEach(subscriber -> {
-            subscriber.onComplete();
+        subscribers.forEach(subscriber -> {
+            subscriber.close();
             closers.forEach(closer -> closer.accept(subscriber));
         });
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
deleted file mode 100644
index 92f24ab..0000000
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/SseUnboundedSubscription.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.sse;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.ws.rs.Flow;
-import javax.ws.rs.Flow.Subscription;
-import javax.ws.rs.sse.OutboundSseEvent;
-
-class SseUnboundedSubscription implements Subscription {
-    // Has subscription been cancelled or not?
-    private boolean cancelled;
-    // Current demand: what has been requested but not yet delivered
-    private long demand;
-    private final BlockingQueue<OutboundSseEvent> buffer = new LinkedBlockingQueue<>(); 
-    private final Flow.Subscriber<? super OutboundSseEvent> subscriber;
-    
-    SseUnboundedSubscription(Flow.Subscriber<? super OutboundSseEvent> subscriber) {
-        this.subscriber = subscriber;
-    }
-    
-    public void request(long n) {
-        if (demand + n < 1) {
-            // Effectively unbounded demand 
-            demand = Long.MAX_VALUE;
-            send();
-        } else {
-            // Here we record the downstream demand
-            demand += n;
-            send();
-        }
-    }
-
-    @Override
-    public void cancel() {
-        cancelled = true;
-    }
-    
-    public void send(OutboundSseEvent event) throws InterruptedException {
-        if (!cancelled && buffer.offer(event)) {
-            send();
-        }
-    }
-    
-    private void send() {
-        while (!cancelled && demand > 0 && !buffer.isEmpty()) {
-            final OutboundSseEvent event = buffer.poll();
-            if (event != null) {
-                subscriber.onNext(event);
-                --demand;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
index 268e9d2..a79eff7 100644
--- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
+++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/atmosphere/SseAtmosphereEventSinkImpl.java
@@ -22,13 +22,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Logger;
 
-import javax.ws.rs.Flow.Subscription;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.sse.OutboundSseEvent;
 import javax.ws.rs.sse.SseEventSink;
@@ -97,7 +97,9 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
     }
 
     @Override
-    public void onNext(OutboundSseEvent event) {
+    public CompletionStage<?> send(OutboundSseEvent event) {
+        final CompletableFuture<?> future = new CompletableFuture<>();
+        
         if (!closed && writer != null) {
             try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
                 writer.writeTo(event, event.getClass(), null, new Annotation [] {}, event.getMediaType(), null, os);
@@ -105,41 +107,27 @@ public class SseAtmosphereEventSinkImpl implements SseEventSink {
                 // Atmosphere broadcasts asynchronously which is acceptable in most cases.
                 // Unfortunately, calling close() may lead to response stream being closed
                 // while there are still some SSE delivery scheduled.
-                final Future<Object> future = resource
-                    .getBroadcaster()
-                    .broadcast(os.toString(StandardCharsets.UTF_8.name()));
-
-                try {
-                    if (!future.isDone()) {
-                        // Let us wait at least 200 milliseconds before returning to ensure
-                        // that SSE had the opportunity to be delivered.
-                        LOG.fine("Waiting 200ms to ensure SSE Atmosphere response is delivered");
-                        future.get(200, TimeUnit.MILLISECONDS);
-                    }
-                } catch (final ExecutionException | InterruptedException ex) {
-                    throw new IOException(ex);
-                } catch (final TimeoutException ex) {
-                    LOG.warning("SSE Atmosphere response was not delivered within default timeout");
-                }
+                return CompletableFuture.completedFuture(
+                    resource
+                        .getBroadcaster()
+                        .broadcast(os.toString(StandardCharsets.UTF_8.name()))
+                        .get(1, TimeUnit.SECONDS)
+                    );
             } catch (final IOException ex) {
                 LOG.warning("While writing the SSE event, an exception was raised: " + ex);
+                future.completeExceptionally(ex);
+            } catch (final ExecutionException | InterruptedException ex) {
+                LOG.warning("SSE Atmosphere response was not delivered");
+                future.completeExceptionally(ex);
+            } catch (final TimeoutException ex) {
+                LOG.warning("SSE Atmosphere response was not delivered within default timeout");
+                future.completeExceptionally(ex);
             }
+        } else {
+            future.complete(null);
         }
-    }
-
-    @Override
-    public void onError(Throwable throwable) {
-        // TODO: Should we close the response?
-    }
-
-    @Override
-    public void onComplete() {
-        close();
-    }
-
-    @Override
-    public void onSubscribe(Subscription subscription) {
-        subscription.request(Long.MAX_VALUE);
+        
+        return future;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
index 62dda23..1498653 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java
@@ -39,6 +39,8 @@ import javax.ws.rs.core.Response;
 
 import org.apache.cxf.annotations.UseNio;
 import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.nio.NioReadEntity;
+import org.apache.cxf.jaxrs.nio.NioWriteEntity;
 
 @Path("/bookstore")
 public class NioBookStore {
@@ -49,31 +51,35 @@ public class NioBookStore {
             IOUtils.readBytesFromStream(getClass().getResourceAsStream("/files/books.txt")));
         final byte[] buffer = new byte[4096];
 
-        return Response.ok().entity(
-            out -> {
-                try {
-                    final int n = in.read(buffer);
-
-                    if (n >= 0) {
-                        out.write(buffer, 0, n);
-                        return true;
-                    }
-
+        return Response
+            .ok()
+            .entity(
+                new NioWriteEntity(
+                out -> {
                     try {
-                        in.close();
+                        final int n = in.read(buffer);
+    
+                        if (n >= 0) {
+                            out.write(buffer, 0, n);
+                            return true;
+                        }
+    
+                        try {
+                            in.close();
+                        } catch (IOException ex) {
+                            /* do nothing */
+                        }
+    
+                        return false;
                     } catch (IOException ex) {
-                        /* do nothing */
+                        throw new WebApplicationException(ex);
                     }
-
-                    return false;
-                } catch (IOException ex) {
-                    throw new WebApplicationException(ex);
+                },
+                throwable -> {
+                    throw throwable;
                 }
-            },
-            throwable -> {
-                throw throwable;
-            }
-        ).build();
+            ))
+            .build();
     }
 
     @GET
@@ -92,34 +98,33 @@ public class NioBookStore {
         final byte[] buffer = new byte[4096];
         final LongAdder adder = new LongAdder();
 
-        request.entity(
-            in -> {
-                try {
-                    final int n = in.read(buffer);
-                    if (n > 0) {
-                        adder.add(n);
-                        out.write(buffer, 0, n);
-                    }
-                } catch (IOException e) {
-                    throw new WebApplicationException(e);
+        new NioReadEntity(
+        in -> {
+            try {
+                final int n = in.read(buffer);
+                if (n > 0) {
+                    adder.add(n);
+                    out.write(buffer, 0, n);
                 }
-            },
-            in -> {
-                try {
-                    if (!in.isFinished()) {
-                        throw new IllegalStateException("Reader did not finish yet");
-                    }
-
-                    out.close();
-                    response.resume("Book Store uploaded: " + adder.longValue() + " bytes");
-                } catch (IOException e) {
-                    throw new WebApplicationException(e);
+            } catch (IOException e) {
+                throw new WebApplicationException(e);
+            }
+        },
+        in -> {
+            try {
+                if (!in.isFinished()) {
+                    throw new IllegalStateException("Reader did not finish yet");
                 }
-            },
-            throwable -> {              // error handler
-                System.out.println("Problem found: " + throwable.getMessage());
-                throw throwable;
+
+                out.close();
+                response.resume("Book Store uploaded: " + adder.longValue() + " bytes");
+            } catch (IOException e) {
+                throw new WebApplicationException(e);
             }
-        );
+        },
+        throwable -> {              // error handler
+            System.out.println("Problem found: " + throwable.getMessage());
+            throw throwable;
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
index b336cc9..c096713 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore.java
@@ -77,13 +77,13 @@ public class BookStore {
                     final Integer id = Integer.valueOf(lastEventId);
                     final Builder builder = sse.newEventBuilder();
 
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 1));
+                    sink.send(createStatsEvent(builder.name("book"), id + 1));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 2));
+                    sink.send(createStatsEvent(builder.name("book"), id + 2));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 3));
+                    sink.send(createStatsEvent(builder.name("book"), id + 3));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 4));
+                    sink.send(createStatsEvent(builder.name("book"), id + 4));
                     Thread.sleep(200);
                     sink.close();
                 } catch (final InterruptedException ex) {
@@ -98,7 +98,7 @@ public class BookStore {
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void broadcast(@Context SseEventSink sink) {
         try {
-            broadcaster.subscribe(sink);
+            broadcaster.register(sink);
         } finally {
             latch.countDown();
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/09121b3a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
----------------------------------------------------------------------
diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
index b14e86a..a5eeb8e 100644
--- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
+++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/BookStore2.java
@@ -76,13 +76,13 @@ public class BookStore2 {
                     final Integer id = Integer.valueOf(lastEventId);
                     final Builder builder = sse.newEventBuilder();
 
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 1));
+                    sink.send(createStatsEvent(builder.name("book"), id + 1));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 2));
+                    sink.send(createStatsEvent(builder.name("book"), id + 2));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 3));
+                    sink.send(createStatsEvent(builder.name("book"), id + 3));
                     Thread.sleep(200);
-                    sink.onNext(createStatsEvent(builder.name("book"), id + 4));
+                    sink.send(createStatsEvent(builder.name("book"), id + 4));
                     Thread.sleep(200);
                     sink.close();
                 } catch (final InterruptedException ex) {
@@ -97,7 +97,7 @@ public class BookStore2 {
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void broadcast(@Context SseEventSink sink) {
         try {
-            broadcaster.subscribe(sink);
+            broadcaster.register(sink);
         } finally {
             latch.countDown();
         }