You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/29 13:48:21 UTC
[cxf] branch 3.3.x-fixes updated: CXF-8242: Stop blocking executor
thread on microprofile rest asynchronous call (#654)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
new 6b602d3 CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
6b602d3 is described below
commit 6b602d3c267c5c73b2bced1c5a8368f0d6a06da1
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat Mar 28 16:32:33 2020 -0400
CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
(cherry picked from commit 3921fc29c265addfa36d0b4260903eef96ac1e25)
---
.../org/apache/cxf/endpoint/ClientCallback.java | 79 ++++------
.../apache/cxf/endpoint/ClientCallbackTest.java | 164 ++++++++++++++++++++
.../org/apache/cxf/jaxws/JaxwsClientCallback.java | 17 ++-
.../apache/cxf/jaxws/JaxwsClientCallbackTest.java | 162 ++++++++++++++++++++
.../cxf/jaxrs/client/JaxrsClientCallback.java | 10 +-
.../cxf/jaxrs/client/JaxrsClientCallbackTest.java | 169 +++++++++++++++++++++
.../microprofile/client/MPRestClientCallback.java | 31 +---
.../client/MPRestClientCallbackTest.java | 161 ++++++++++++++++++++
8 files changed, 700 insertions(+), 93 deletions(-)
diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
index ed35858..b9563c7 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
@@ -20,6 +20,8 @@
package org.apache.cxf.endpoint;
import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -40,12 +42,8 @@ import org.apache.cxf.message.Message;
* </ol>
*/
public class ClientCallback implements Future<Object[]> {
-
+ protected final CompletableFuture<Object[]> delegate = new CompletableFuture<Object[]>();
protected Map<String, Object> context;
- protected Object[] result;
- protected Throwable exception;
- protected volatile boolean done;
- protected boolean cancelled;
protected boolean started;
public ClientCallback() {
@@ -72,8 +70,8 @@ public class ClientCallback implements Future<Object[]> {
*/
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
- done = true;
+ delegate.complete(res);
+
synchronized (this) {
notifyAll();
}
@@ -91,8 +89,8 @@ public class ClientCallback implements Future<Object[]> {
*/
public void handleException(Map<String, Object> ctx, Throwable ex) {
context = ctx;
- exception = ex;
- done = true;
+ delegate.completeExceptionally(ex);
+
synchronized (this) {
notifyAll();
}
@@ -101,10 +99,12 @@ public class ClientCallback implements Future<Object[]> {
public boolean cancel(boolean mayInterruptIfRunning) {
if (!started) {
- cancelled = true;
+ delegate.cancel(mayInterruptIfRunning);
+
synchronized (this) {
notifyAll();
}
+
return true;
}
return false;
@@ -118,15 +118,15 @@ public class ClientCallback implements Future<Object[]> {
*/
public Map<String, Object> getResponseContext() throws InterruptedException, ExecutionException {
synchronized (this) {
- if (!done) {
+ if (!delegate.isDone()) {
wait();
}
}
- if (cancelled) {
+ if (delegate.isCancelled()) {
throw new InterruptedException("Operation Cancelled");
}
- if (exception != null) {
- throw new ExecutionException(exception);
+ if (delegate.isCompletedExceptionally()) {
+ delegate.get();
}
return context;
}
@@ -135,55 +135,32 @@ public class ClientCallback implements Future<Object[]> {
* {@inheritDoc}
*/
public Object[] get() throws InterruptedException, ExecutionException {
- synchronized (this) {
- if (!done) {
- wait();
- }
+ try {
+ return delegate.get();
+ } catch (final CancellationException ex) {
+ // Preserving the exception raised by former implementation
+ throw new InterruptedException("Operation has been cancelled");
}
- if (cancelled) {
- throw new InterruptedException("Operation Cancelled");
- }
- if (exception != null) {
- throw new ExecutionException(exception);
- }
- return result;
}
/**
* {@inheritDoc}
*/
- public Object[] get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- synchronized (this) {
- if (!done) {
- unit.timedWait(this, timeout);
- }
- }
- if (cancelled) {
- throw new InterruptedException("Operation Cancelled");
- }
- if (!done) {
- throw new TimeoutException("Timeout Exceeded");
- }
- if (exception != null) {
- throw new ExecutionException(exception);
+ public Object[] get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ try {
+ return delegate.get(timeout, unit);
+ } catch (final CancellationException ex) {
+ // Preserving the exception raised by former implementation
+ throw new InterruptedException("Operation has been cancelled");
}
- return result;
}
public boolean isCancelled() {
- return cancelled;
+ return delegate.isCancelled();
}
public boolean isDone() {
- return done;
+ return delegate.isDone();
}
-
- /*
- * If the operation completes with a fault, the resulting exception object ends up here.
- */
- public Throwable getException() {
- return exception;
- }
-
}
diff --git a/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
new file mode 100644
index 0000000..2a3f74a
--- /dev/null
+++ b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientCallbackTest {
+ private Map<String, Object> ctx;
+ private ClientCallback callback;
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ callback = new ClientCallback();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThrows(ExecutionException.class, () -> callback.get());
+ assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.cancel(true));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThrows(InterruptedException.class, () -> callback.get());
+ assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(callback.cancel(true), equalTo(false));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.cancel(true));
+
+ assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ callback.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
index ceb1450..28b4c24 100644
--- a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
+++ b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
@@ -39,10 +39,12 @@ class JaxwsClientCallback<T> extends ClientCallback {
}
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
+ delegate.complete(res);
+
if (handler != null) {
handler.handleResponse(new Response<T>() {
-
+ protected boolean cancelled;
+
public Map<String, Object> getContext() {
return context;
}
@@ -54,13 +56,13 @@ class JaxwsClientCallback<T> extends ClientCallback {
@SuppressWarnings("unchecked")
public T get() throws InterruptedException, ExecutionException {
- return (T)result[0];
+ return (T)res[0];
}
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
- return (T)result[0];
+ return (T)res[0];
}
public boolean isCancelled() {
@@ -73,7 +75,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
});
}
- done = true;
+
synchronized (this) {
notifyAll();
}
@@ -82,9 +84,10 @@ class JaxwsClientCallback<T> extends ClientCallback {
@Override
public void handleException(Map<String, Object> ctx, final Throwable ex) {
context = ctx;
- exception = mapThrowable(ex);
+ delegate.completeExceptionally(mapThrowable(ex));
if (handler != null) {
handler.handleResponse(new Response<T>() {
+ protected boolean cancelled;
public Map<String, Object> getContext() {
return context;
@@ -115,7 +118,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
});
}
- done = true;
+
synchronized (this) {
notifyAll();
}
diff --git a/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
new file mode 100644
index 0000000..1e86703
--- /dev/null
+++ b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxws;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxwsClientCallbackTest {
+ private Map<String, Object> ctx;
+ private JaxwsClientCallback<String> callback;
+ private AsyncHandler<String> handler;
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new AsyncHandler<String>() {
+ @Override
+ public void handleResponse(Response<String> res) {
+ }
+ };
+
+ callback = new JaxwsClientCallback<String>(handler, null);
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.get());
+ assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(InterruptedException.class, () -> callback.get());
+ assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(callback.cancel(true), equalTo(false));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.cancel(true));
+
+ assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ callback.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index 98443c5..5db63bc 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -71,11 +71,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
@SuppressWarnings("unchecked")
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
+ delegate.complete(res);
if (handler != null) {
handler.completed((T)res[0]);
}
- done = true;
+
synchronized (this) {
notifyAll();
}
@@ -84,11 +84,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
@Override
public void handleException(Map<String, Object> ctx, final Throwable ex) {
context = ctx;
- exception = ex;
+ delegate.completeExceptionally(ex);
if (handler != null) {
- handler.failed(exception);
+ handler.failed(ex);
}
- done = true;
+
synchronized (this) {
notifyAll();
}
diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
new file mode 100644
index 0000000..4f1b124
--- /dev/null
+++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.jaxrs.client.JaxrsClientCallback.JaxrsResponseFuture;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxrsClientCallbackTest {
+ private Map<String, Object> ctx;
+ private JaxrsClientCallback<String> callback;
+ private InvocationCallback<String> handler;
+ private ScheduledExecutorService executor;
+ private JaxrsResponseFuture<String> future;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new InvocationCallback<String>() {
+ @Override
+ public void failed(Throwable throwable) {
+ }
+
+ @Override
+ public void completed(String response) {
+ }
+ };
+
+ callback = new JaxrsClientCallback<String>(handler, String.class, null);
+ future = (JaxrsResponseFuture<String>)callback.createFuture();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.getContext(), equalTo(ctx));
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThat(future.getContext(), nullValue());
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> future.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(InterruptedException.class, () -> future.get());
+ assertThrows(InterruptedException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(future.cancel(true), equalTo(false));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> future.cancel(true));
+
+ assertThat(future.getContext(), nullValue());
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ future.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a99365f..a37dde9 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -22,10 +22,6 @@ package org.apache.cxf.microprofile.client;
import java.lang.reflect.Type;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
@@ -55,31 +51,6 @@ public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
@SuppressWarnings("unchecked")
@Override
public Future<T> createFuture() {
- return (Future<T>)CompletableFuture.supplyAsync(() -> {
- synchronized (this) {
- if (!isDone()) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- throw new CompletionException(e);
- }
- }
- }
- if (exception != null) {
- throw new CompletionException(exception);
- }
- if (isCancelled()) {
- throw new CancellationException();
- }
- if (!isDone()) {
- throw new IllegalStateException(
- "CompletionStage has been notified, indicating completion, but is not completed.");
- }
- try {
- return get()[0];
- } catch (InterruptedException | ExecutionException e) {
- throw new CompletionException(e);
- }
- }, executor);
+ return delegate.thenApplyAsync(res -> (T)res[0], executor);
}
}
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
new file mode 100644
index 0000000..ad36f1e
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.microprofile.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class MPRestClientCallbackTest {
+ private Map<String, Object> ctx;
+ private MPRestClientCallback<String> callback;
+ private InvocationCallback<String> handler;
+ private ScheduledExecutorService executor;
+ private CompletableFuture<String> future;
+ private Message message;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new InvocationCallback<String>() {
+ @Override
+ public void failed(Throwable throwable) {
+ }
+
+ @Override
+ public void completed(String response) {
+ }
+ };
+
+ message = new MessageImpl();
+ callback = new MPRestClientCallback<String>(handler, message, String.class, null);
+ future = (CompletableFuture<String>)callback.createFuture();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> future.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(CancellationException.class, () -> future.get());
+ assertThrows(CancellationException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(message);
+ assertThat(future.cancel(true), equalTo(true));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ future.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}