You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/29 15:38:34 UTC
[cxf] 01/02: CXF-8242: Stop blocking executor thread on
microprofile rest asynchronous call (#654)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit d7e6668f5c3fac8842e932f2f16998aa06099698
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat Mar 28 16:32:33 2020 -0400
CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
(cherry picked from commit 3921fc29c265addfa36d0b4260903eef96ac1e25)
(cherry picked from commit 6b602d3c267c5c73b2bced1c5a8368f0d6a06da1)
# Conflicts:
# rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
---
.../org/apache/cxf/endpoint/ClientCallback.java | 79 ++++------
.../apache/cxf/endpoint/ClientCallbackTest.java | 164 ++++++++++++++++++++
.../org/apache/cxf/jaxws/JaxwsClientCallback.java | 17 ++-
.../apache/cxf/jaxws/JaxwsClientCallbackTest.java | 162 ++++++++++++++++++++
.../apache/cxf/jaxrs/client/ClientProxyImpl.java | 3 +-
.../cxf/jaxrs/client/JaxrsClientCallback.java | 10 +-
.../cxf/jaxrs/client/JaxrsClientCallbackTest.java | 169 +++++++++++++++++++++
.../microprofile/client/MPRestClientCallback.java | 76 ++-------
.../client/proxy/MicroProfileClientProxyImpl.java | 3 +-
.../client/MPRestClientCallbackTest.java | 161 ++++++++++++++++++++
10 files changed, 719 insertions(+), 125 deletions(-)
diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
index 061044c..0fc9ece 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
@@ -20,6 +20,8 @@
package org.apache.cxf.endpoint;
import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -40,12 +42,8 @@ import org.apache.cxf.message.Message;
* </ol>
*/
public class ClientCallback implements Future<Object[]> {
-
+ protected final CompletableFuture<Object[]> delegate = new CompletableFuture<Object[]>();
protected Map<String, Object> context;
- protected Object[] result;
- protected Throwable exception;
- protected volatile boolean done;
- protected boolean cancelled;
protected boolean started;
public ClientCallback() {
@@ -72,8 +70,8 @@ public class ClientCallback implements Future<Object[]> {
*/
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
- done = true;
+ delegate.complete(res);
+
synchronized (this) {
notifyAll();
}
@@ -91,8 +89,8 @@ public class ClientCallback implements Future<Object[]> {
*/
public void handleException(Map<String, Object> ctx, Throwable ex) {
context = ctx;
- exception = ex;
- done = true;
+ delegate.completeExceptionally(ex);
+
synchronized (this) {
notifyAll();
}
@@ -101,10 +99,12 @@ public class ClientCallback implements Future<Object[]> {
public boolean cancel(boolean mayInterruptIfRunning) {
if (!started) {
- cancelled = true;
+ delegate.cancel(mayInterruptIfRunning);
+
synchronized (this) {
notifyAll();
}
+
return true;
}
return false;
@@ -118,15 +118,15 @@ public class ClientCallback implements Future<Object[]> {
*/
public Map<String, Object> getResponseContext() throws InterruptedException, ExecutionException {
synchronized (this) {
- if (!done) {
+ if (!delegate.isDone()) {
wait();
}
}
- if (cancelled) {
+ if (delegate.isCancelled()) {
throw new InterruptedException("Operation Cancelled");
}
- if (exception != null) {
- throw new ExecutionException(exception);
+ if (delegate.isCompletedExceptionally()) {
+ delegate.get();
}
return context;
}
@@ -135,55 +135,32 @@ public class ClientCallback implements Future<Object[]> {
* {@inheritDoc}
*/
public Object[] get() throws InterruptedException, ExecutionException {
- synchronized (this) {
- if (!done) {
- wait();
- }
+ try {
+ return delegate.get();
+ } catch (final CancellationException ex) {
+ // Preserving the exception raised by former implementation
+ throw new InterruptedException("Operation has been cancelled");
}
- if (cancelled) {
- throw new InterruptedException("Operation Cancelled");
- }
- if (exception != null) {
- throw new ExecutionException(exception);
- }
- return result;
}
/**
* {@inheritDoc}
*/
- public Object[] get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- synchronized (this) {
- if (!done) {
- unit.timedWait(this, timeout);
- }
- }
- if (cancelled) {
- throw new InterruptedException("Operation Cancelled");
- }
- if (!done) {
- throw new TimeoutException("Timeout Exceeded");
- }
- if (exception != null) {
- throw new ExecutionException(exception);
+ public Object[] get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ try {
+ return delegate.get(timeout, unit);
+ } catch (final CancellationException ex) {
+ // Preserving the exception raised by former implementation
+ throw new InterruptedException("Operation has been cancelled");
}
- return result;
}
public boolean isCancelled() {
- return cancelled;
+ return delegate.isCancelled();
}
public boolean isDone() {
- return done;
+ return delegate.isDone();
}
-
- /*
- * If the operation completes with a fault, the resulting exception object ends up here.
- */
- public Throwable getException() {
- return exception;
- }
-
}
diff --git a/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
new file mode 100644
index 0000000..2a3f74a
--- /dev/null
+++ b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientCallbackTest {
+ private Map<String, Object> ctx;
+ private ClientCallback callback;
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ callback = new ClientCallback();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThrows(ExecutionException.class, () -> callback.get());
+ assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.cancel(true));
+
+ synchronized (callback) {
+ barrier.await(5, TimeUnit.SECONDS);
+ callback.wait();
+ }
+
+ assertThrows(InterruptedException.class, () -> callback.get());
+ assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(callback.cancel(true), equalTo(false));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.cancel(true));
+
+ assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ callback.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
index ceb1450..28b4c24 100644
--- a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
+++ b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
@@ -39,10 +39,12 @@ class JaxwsClientCallback<T> extends ClientCallback {
}
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
+ delegate.complete(res);
+
if (handler != null) {
handler.handleResponse(new Response<T>() {
-
+ protected boolean cancelled;
+
public Map<String, Object> getContext() {
return context;
}
@@ -54,13 +56,13 @@ class JaxwsClientCallback<T> extends ClientCallback {
@SuppressWarnings("unchecked")
public T get() throws InterruptedException, ExecutionException {
- return (T)result[0];
+ return (T)res[0];
}
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
- return (T)result[0];
+ return (T)res[0];
}
public boolean isCancelled() {
@@ -73,7 +75,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
});
}
- done = true;
+
synchronized (this) {
notifyAll();
}
@@ -82,9 +84,10 @@ class JaxwsClientCallback<T> extends ClientCallback {
@Override
public void handleException(Map<String, Object> ctx, final Throwable ex) {
context = ctx;
- exception = mapThrowable(ex);
+ delegate.completeExceptionally(mapThrowable(ex));
if (handler != null) {
handler.handleResponse(new Response<T>() {
+ protected boolean cancelled;
public Map<String, Object> getContext() {
return context;
@@ -115,7 +118,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
});
}
- done = true;
+
synchronized (this) {
notifyAll();
}
diff --git a/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
new file mode 100644
index 0000000..1e86703
--- /dev/null
+++ b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxws;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxwsClientCallbackTest {
+ private Map<String, Object> ctx;
+ private JaxwsClientCallback<String> callback;
+ private AsyncHandler<String> handler;
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new AsyncHandler<String>() {
+ @Override
+ public void handleResponse(Response<String> res) {
+ }
+ };
+
+ callback = new JaxwsClientCallback<String>(handler, null);
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new Object[0];
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(callback.get(), equalTo(result));
+ assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.get());
+ assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> callback.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(InterruptedException.class, () -> callback.get());
+ assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(callback.cancel(true), equalTo(false));
+ assertThat(callback.isCancelled(), equalTo(false));
+ assertThat(callback.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.cancel(true));
+
+ assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+ assertThat(callback.isCancelled(), equalTo(true));
+ assertThat(callback.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ callback.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
index 59de875..e48a449 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
@@ -921,7 +921,7 @@ public class ClientProxyImpl extends AbstractClient implements
InvocationCallback<Object> asyncCallback) {
outMessage.getExchange().setSynchronous(false);
setAsyncMessageObserverIfNeeded(outMessage.getExchange());
- JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback,
+ JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback, outMessage,
ori.getMethodToInvoke().getReturnType(), ori.getMethodToInvoke().getGenericReturnType());
outMessage.getExchange().put(JaxrsClientCallback.class, cb);
doRunInterceptorChain(outMessage);
@@ -930,6 +930,7 @@ public class ClientProxyImpl extends AbstractClient implements
}
protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
return new JaxrsClientCallback<>(asyncCallback, responseClass, outGenericType);
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index 98443c5..5db63bc 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -71,11 +71,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
@SuppressWarnings("unchecked")
public void handleResponse(Map<String, Object> ctx, Object[] res) {
context = ctx;
- result = res;
+ delegate.complete(res);
if (handler != null) {
handler.completed((T)res[0]);
}
- done = true;
+
synchronized (this) {
notifyAll();
}
@@ -84,11 +84,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
@Override
public void handleException(Map<String, Object> ctx, final Throwable ex) {
context = ctx;
- exception = ex;
+ delegate.completeExceptionally(ex);
if (handler != null) {
- handler.failed(exception);
+ handler.failed(ex);
}
- done = true;
+
synchronized (this) {
notifyAll();
}
diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
new file mode 100644
index 0000000..4f1b124
--- /dev/null
+++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.jaxrs.client.JaxrsClientCallback.JaxrsResponseFuture;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxrsClientCallbackTest {
+ private Map<String, Object> ctx;
+ private JaxrsClientCallback<String> callback;
+ private InvocationCallback<String> handler;
+ private ScheduledExecutorService executor;
+ private JaxrsResponseFuture<String> future;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new InvocationCallback<String>() {
+ @Override
+ public void failed(Throwable throwable) {
+ }
+
+ @Override
+ public void completed(String response) {
+ }
+ };
+
+ callback = new JaxrsClientCallback<String>(handler, String.class, null);
+ future = (JaxrsResponseFuture<String>)callback.createFuture();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.getContext(), equalTo(ctx));
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThat(future.getContext(), nullValue());
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> future.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(InterruptedException.class, () -> future.get());
+ assertThrows(InterruptedException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(new MessageImpl());
+ assertThat(future.cancel(true), equalTo(false));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testGetResponseContextOnCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> future.cancel(true));
+
+ assertThat(future.getContext(), nullValue());
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ future.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a9c8857..a37dde9 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,81 +20,37 @@
package org.apache.cxf.microprofile.client;
import java.lang.reflect.Type;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.InvocationCallback;
import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
+import org.apache.cxf.message.Message;
public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
+ private final ExecutorService executor;
public MPRestClientCallback(InvocationCallback<T> handler,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
super(handler, responseClass, outGenericType);
+ ExecutorService es = outMessage.get(ExecutorService.class);
+ if (es == null) {
+ es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
+ return ForkJoinPool.commonPool();
+ });
+ }
+ executor = es;
}
+ @SuppressWarnings("unchecked")
@Override
public Future<T> createFuture() {
- return new MPRestClientResponseFuture<T>(this);
- }
-
- static class MPRestClientResponseFuture<T> extends CompletableFuture<T> implements Future<T> {
- MPRestClientCallback<T> callback;
- MPRestClientResponseFuture(MPRestClientCallback<T> cb) {
- callback = cb;
- }
-
- public Map<String, Object> getContext() {
- try {
- return callback.getResponseContext();
- } catch (Exception ex) {
- return null;
- }
- }
- public boolean cancel(boolean mayInterruptIfRunning) {
- return callback.cancel(mayInterruptIfRunning);
- }
-
- public T get() throws InterruptedException, ExecutionException {
- try {
- return getObject(callback.get()[0]);
- } catch (InterruptedException ex) {
- InvocationCallback<T> handler = callback.getHandler();
- if (handler != null) {
- handler.failed(ex);
- }
- throw ex;
- }
- }
- public T get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
- try {
- return getObject(callback.get(timeout, unit)[0]);
- } catch (InterruptedException ex) {
- InvocationCallback<T> handler = callback.getHandler();
- if (handler != null) {
- handler.failed(ex);
- }
- throw ex;
- }
- }
-
- @SuppressWarnings("unchecked")
- private T getObject(Object object) {
- return (T)object;
- }
-
- public boolean isCancelled() {
- return callback.isCancelled();
- }
- public boolean isDone() {
- return callback.isDone();
- }
+ return delegate.thenApplyAsync(res -> (T)res[0], executor);
}
}
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 46dee88..2bd02ee 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -111,9 +111,10 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
@Override
protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+ Message outMessage,
Class<?> responseClass,
Type outGenericType) {
- return new MPRestClientCallback<Object>(asyncCallback, responseClass, outGenericType);
+ return new MPRestClientCallback<Object>(asyncCallback, outMessage, responseClass, outGenericType);
}
@Override
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
new file mode 100644
index 0000000..ad36f1e
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.microprofile.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class MPRestClientCallbackTest {
+ private Map<String, Object> ctx;
+ private MPRestClientCallback<String> callback;
+ private InvocationCallback<String> handler;
+ private ScheduledExecutorService executor;
+ private CompletableFuture<String> future;
+ private Message message;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ handler = new InvocationCallback<String>() {
+ @Override
+ public void failed(Throwable throwable) {
+ }
+
+ @Override
+ public void completed(String response) {
+ }
+ };
+
+ message = new MessageImpl();
+ callback = new MPRestClientCallback<String>(handler, message, String.class, null);
+ future = (CompletableFuture<String>)callback.createFuture();
+ ctx = new HashMap<String, Object>();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleResponseCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnSuccessCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Object[] result = new String[] {"results"};
+ schedule(barrier, () -> callback.handleResponse(ctx, result));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThat(callback.getResponseContext(), equalTo(ctx));
+ assertThat(future.get(), equalTo("results"));
+ assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testGetResponseContextOnExceptionCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(1);
+ schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+ assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+ assertThat(future.isCancelled(), equalTo(false));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallback() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ schedule(barrier, () -> future.cancel(true));
+ barrier.await(5, TimeUnit.SECONDS);
+
+ assertThrows(CancellationException.class, () -> future.get());
+ assertThrows(CancellationException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test
+ public void testHandleCancellationCallbackWhenStarted() throws Exception {
+ callback.start(message);
+ assertThat(future.cancel(true), equalTo(true));
+ assertThat(future.isCancelled(), equalTo(true));
+ assertThat(future.isDone(), equalTo(true));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testTimeout() throws Exception {
+ future.get(10, TimeUnit.MILLISECONDS);
+ }
+
+ private void schedule(CyclicBarrier barrier, Runnable runnable) {
+ executor.schedule(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ runnable.run();
+ return null;
+ }, 100, TimeUnit.MILLISECONDS);
+ }
+}