You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/29 13:48:21 UTC

[cxf] branch 3.3.x-fixes updated: CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
     new 6b602d3  CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
6b602d3 is described below

commit 6b602d3c267c5c73b2bced1c5a8368f0d6a06da1
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat Mar 28 16:32:33 2020 -0400

    CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
    
    (cherry picked from commit 3921fc29c265addfa36d0b4260903eef96ac1e25)
---
 .../org/apache/cxf/endpoint/ClientCallback.java    |  79 ++++------
 .../apache/cxf/endpoint/ClientCallbackTest.java    | 164 ++++++++++++++++++++
 .../org/apache/cxf/jaxws/JaxwsClientCallback.java  |  17 ++-
 .../apache/cxf/jaxws/JaxwsClientCallbackTest.java  | 162 ++++++++++++++++++++
 .../cxf/jaxrs/client/JaxrsClientCallback.java      |  10 +-
 .../cxf/jaxrs/client/JaxrsClientCallbackTest.java  | 169 +++++++++++++++++++++
 .../microprofile/client/MPRestClientCallback.java  |  31 +---
 .../client/MPRestClientCallbackTest.java           | 161 ++++++++++++++++++++
 8 files changed, 700 insertions(+), 93 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
index ed35858..b9563c7 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
@@ -20,6 +20,8 @@
 package org.apache.cxf.endpoint;
 
 import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -40,12 +42,8 @@ import org.apache.cxf.message.Message;
  * </ol>
  */
 public class ClientCallback implements Future<Object[]> {
-
+    protected final CompletableFuture<Object[]> delegate = new CompletableFuture<Object[]>();
     protected Map<String, Object> context;
-    protected Object[] result;
-    protected Throwable exception;
-    protected volatile boolean done;
-    protected boolean cancelled;
     protected boolean started;
 
     public ClientCallback() {
@@ -72,8 +70,8 @@ public class ClientCallback implements Future<Object[]> {
      */
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
-        done = true;
+        delegate.complete(res);
+
         synchronized (this) {
             notifyAll();
         }
@@ -91,8 +89,8 @@ public class ClientCallback implements Future<Object[]> {
      */
     public void handleException(Map<String, Object> ctx, Throwable ex) {
         context = ctx;
-        exception = ex;
-        done = true;
+        delegate.completeExceptionally(ex);
+        
         synchronized (this) {
             notifyAll();
         }
@@ -101,10 +99,12 @@ public class ClientCallback implements Future<Object[]> {
 
     public boolean cancel(boolean mayInterruptIfRunning) {
         if (!started) {
-            cancelled = true;
+            delegate.cancel(mayInterruptIfRunning);
+            
             synchronized (this) {
                 notifyAll();
             }
+            
             return true;
         }
         return false;
@@ -118,15 +118,15 @@ public class ClientCallback implements Future<Object[]> {
      */
     public Map<String, Object> getResponseContext() throws InterruptedException, ExecutionException {
         synchronized (this) {
-            if (!done) {
+            if (!delegate.isDone()) {
                 wait();
             }
         }
-        if (cancelled) {
+        if (delegate.isCancelled()) {
             throw new InterruptedException("Operation Cancelled");
         }
-        if (exception != null) {
-            throw new ExecutionException(exception);
+        if (delegate.isCompletedExceptionally()) {
+            delegate.get();
         }
         return context;
     }
@@ -135,55 +135,32 @@ public class ClientCallback implements Future<Object[]> {
      * {@inheritDoc}
      */
     public Object[] get() throws InterruptedException, ExecutionException {
-        synchronized (this) {
-            if (!done) {
-                wait();
-            }
+        try {
+            return delegate.get();
+        } catch (final CancellationException ex) {
+            // Preserving the exception raised by former implementation
+            throw new InterruptedException("Operation has been cancelled");
         }
-        if (cancelled) {
-            throw new InterruptedException("Operation Cancelled");
-        }
-        if (exception != null) {
-            throw new ExecutionException(exception);
-        }
-        return result;
     }
 
     /**
      * {@inheritDoc}
      */
-    public Object[] get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        synchronized (this) {
-            if (!done) {
-                unit.timedWait(this, timeout);
-            }
-        }
-        if (cancelled) {
-            throw new InterruptedException("Operation Cancelled");
-        }
-        if (!done) {
-            throw new TimeoutException("Timeout Exceeded");
-        }
-        if (exception != null) {
-            throw new ExecutionException(exception);
+    public Object[] get(long timeout, TimeUnit unit) throws InterruptedException, 
+            ExecutionException, TimeoutException {
+        try {
+            return delegate.get(timeout, unit);
+        } catch (final CancellationException ex) {
+            // Preserving the exception raised by former implementation
+            throw new InterruptedException("Operation has been cancelled");
         }
-        return result;
     }
 
     public boolean isCancelled() {
-        return cancelled;
+        return delegate.isCancelled();
     }
 
     public boolean isDone() {
-        return done;
+        return delegate.isDone();
     }
-
-    /*
-     * If the operation completes with a fault, the resulting exception object ends up here.
-     */
-    public Throwable getException() {
-        return exception;
-    }
-
 }
diff --git a/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
new file mode 100644
index 0000000..2a3f74a
--- /dev/null
+++ b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientCallbackTest {
+    private Map<String, Object> ctx;
+    private ClientCallback callback;
+    private ScheduledExecutorService executor;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        callback = new ClientCallback();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThrows(ExecutionException.class, () -> callback.get());
+        assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.cancel(true));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThrows(InterruptedException.class, () -> callback.get());
+        assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(callback.cancel(true), equalTo(false));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.cancel(true));
+
+        assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        callback.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
index ceb1450..28b4c24 100644
--- a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
+++ b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
@@ -39,10 +39,12 @@ class JaxwsClientCallback<T> extends ClientCallback {
     }
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
+        delegate.complete(res);
+        
         if (handler != null) {
             handler.handleResponse(new Response<T>() {
-
+                protected boolean cancelled;
+                
                 public Map<String, Object> getContext() {
                     return context;
                 }
@@ -54,13 +56,13 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
                 @SuppressWarnings("unchecked")
                 public T get() throws InterruptedException, ExecutionException {
-                    return (T)result[0];
+                    return (T)res[0];
                 }
 
                 @SuppressWarnings("unchecked")
                 public T get(long timeout, TimeUnit unit) throws InterruptedException,
                     ExecutionException, TimeoutException {
-                    return (T)result[0];
+                    return (T)res[0];
                 }
 
                 public boolean isCancelled() {
@@ -73,7 +75,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
             });
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
@@ -82,9 +84,10 @@ class JaxwsClientCallback<T> extends ClientCallback {
     @Override
     public void handleException(Map<String, Object> ctx, final Throwable ex) {
         context = ctx;
-        exception = mapThrowable(ex);
+        delegate.completeExceptionally(mapThrowable(ex));
         if (handler != null) {
             handler.handleResponse(new Response<T>() {
+                protected boolean cancelled;
 
                 public Map<String, Object> getContext() {
                     return context;
@@ -115,7 +118,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
             });
         }
-        done = true;
+
         synchronized (this) {
             notifyAll();
         }
diff --git a/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
new file mode 100644
index 0000000..1e86703
--- /dev/null
+++ b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxws;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxwsClientCallbackTest {
+    private Map<String, Object> ctx;
+    private JaxwsClientCallback<String> callback;
+    private AsyncHandler<String> handler;
+    private ScheduledExecutorService executor;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new AsyncHandler<String>() {
+            @Override
+            public void handleResponse(Response<String> res) {
+            }
+        };
+        
+        callback = new JaxwsClientCallback<String>(handler, null);
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.get());
+        assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(InterruptedException.class, () -> callback.get());
+        assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(callback.cancel(true), equalTo(false));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.cancel(true));
+
+        assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        callback.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index 98443c5..5db63bc 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -71,11 +71,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
     @SuppressWarnings("unchecked")
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
+        delegate.complete(res);
         if (handler != null) {
             handler.completed((T)res[0]);
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
@@ -84,11 +84,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
     @Override
     public void handleException(Map<String, Object> ctx, final Throwable ex) {
         context = ctx;
-        exception = ex;
+        delegate.completeExceptionally(ex);
         if (handler != null) {
-            handler.failed(exception);
+            handler.failed(ex);
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
new file mode 100644
index 0000000..4f1b124
--- /dev/null
+++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.jaxrs.client.JaxrsClientCallback.JaxrsResponseFuture;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxrsClientCallbackTest {
+    private Map<String, Object> ctx;
+    private JaxrsClientCallback<String> callback;
+    private InvocationCallback<String> handler;
+    private ScheduledExecutorService executor;
+    private JaxrsResponseFuture<String> future;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new InvocationCallback<String>() {
+            @Override
+            public void failed(Throwable throwable) {
+            }
+            
+            @Override
+            public void completed(String response) {
+            }
+        };
+        
+        callback = new JaxrsClientCallback<String>(handler, String.class, null);
+        future = (JaxrsResponseFuture<String>)callback.createFuture();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.getContext(), equalTo(ctx));
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> future.get());
+        assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThat(future.getContext(), nullValue());
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> future.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(InterruptedException.class, () -> future.get());
+        assertThrows(InterruptedException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(future.cancel(true), equalTo(false));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> future.cancel(true));
+
+        assertThat(future.getContext(), nullValue());
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        future.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a99365f..a37dde9 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -22,10 +22,6 @@ package org.apache.cxf.microprofile.client;
 import java.lang.reflect.Type;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
@@ -55,31 +51,6 @@ public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
     @SuppressWarnings("unchecked")
     @Override
     public Future<T> createFuture() {
-        return (Future<T>)CompletableFuture.supplyAsync(() -> {
-            synchronized (this) {
-                if (!isDone()) {
-                    try {
-                        this.wait();
-                    } catch (InterruptedException e) {
-                        throw new CompletionException(e);
-                    }
-                }
-            }
-            if (exception != null) {
-                throw new CompletionException(exception);
-            }
-            if (isCancelled()) {
-                throw new CancellationException();
-            }
-            if (!isDone()) {
-                throw new IllegalStateException(
-                    "CompletionStage has been notified, indicating completion, but is not completed.");
-            }
-            try {
-                return get()[0];
-            } catch (InterruptedException | ExecutionException e) {
-                throw new CompletionException(e);
-            }
-        }, executor);
+        return delegate.thenApplyAsync(res -> (T)res[0], executor);
     }
 }
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
new file mode 100644
index 0000000..ad36f1e
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.microprofile.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class MPRestClientCallbackTest {
+    private Map<String, Object> ctx;
+    private MPRestClientCallback<String> callback;
+    private InvocationCallback<String> handler;
+    private ScheduledExecutorService executor;
+    private CompletableFuture<String> future;
+    private Message message;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new InvocationCallback<String>() {
+            @Override
+            public void failed(Throwable throwable) {
+            }
+            
+            @Override
+            public void completed(String response) {
+            }
+        };
+        
+        message = new MessageImpl();
+        callback = new MPRestClientCallback<String>(handler, message, String.class, null);
+        future = (CompletableFuture<String>)callback.createFuture();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> future.get());
+        assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> future.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(CancellationException.class, () -> future.get());
+        assertThrows(CancellationException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(message);
+        assertThat(future.cancel(true), equalTo(true));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        future.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}