You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2020/03/29 15:38:33 UTC

[cxf] branch 3.2.x-fixes updated (c4449f9 -> 0166c96)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from c4449f9  Recording .gitmergeinfo Changes
     new d7e6668  CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
     new 0166c96  Recording .gitmergeinfo Changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   1 +
 .../org/apache/cxf/endpoint/ClientCallback.java    |  79 ++++------
 .../apache/cxf/endpoint/ClientCallbackTest.java    | 164 ++++++++++++++++++++
 .../org/apache/cxf/jaxws/JaxwsClientCallback.java  |  17 ++-
 .../apache/cxf/jaxws/JaxwsClientCallbackTest.java  | 162 ++++++++++++++++++++
 .../apache/cxf/jaxrs/client/ClientProxyImpl.java   |   3 +-
 .../cxf/jaxrs/client/JaxrsClientCallback.java      |  10 +-
 .../cxf/jaxrs/client/JaxrsClientCallbackTest.java  | 169 +++++++++++++++++++++
 .../microprofile/client/MPRestClientCallback.java  |  76 ++-------
 .../client/proxy/MicroProfileClientProxyImpl.java  |   3 +-
 .../client/MPRestClientCallbackTest.java           | 161 ++++++++++++++++++++
 11 files changed, 720 insertions(+), 125 deletions(-)
 create mode 100644 core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
 create mode 100644 rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
 create mode 100644 rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
 create mode 100644 rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java


[cxf] 01/02: CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit d7e6668f5c3fac8842e932f2f16998aa06099698
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat Mar 28 16:32:33 2020 -0400

    CXF-8242: Stop blocking executor thread on microprofile rest asynchronous call (#654)
    
    (cherry picked from commit 3921fc29c265addfa36d0b4260903eef96ac1e25)
    (cherry picked from commit 6b602d3c267c5c73b2bced1c5a8368f0d6a06da1)
    
    # Conflicts:
    #	rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
---
 .../org/apache/cxf/endpoint/ClientCallback.java    |  79 ++++------
 .../apache/cxf/endpoint/ClientCallbackTest.java    | 164 ++++++++++++++++++++
 .../org/apache/cxf/jaxws/JaxwsClientCallback.java  |  17 ++-
 .../apache/cxf/jaxws/JaxwsClientCallbackTest.java  | 162 ++++++++++++++++++++
 .../apache/cxf/jaxrs/client/ClientProxyImpl.java   |   3 +-
 .../cxf/jaxrs/client/JaxrsClientCallback.java      |  10 +-
 .../cxf/jaxrs/client/JaxrsClientCallbackTest.java  | 169 +++++++++++++++++++++
 .../microprofile/client/MPRestClientCallback.java  |  76 ++-------
 .../client/proxy/MicroProfileClientProxyImpl.java  |   3 +-
 .../client/MPRestClientCallbackTest.java           | 161 ++++++++++++++++++++
 10 files changed, 719 insertions(+), 125 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
index 061044c..0fc9ece 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
@@ -20,6 +20,8 @@
 package org.apache.cxf.endpoint;
 
 import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -40,12 +42,8 @@ import org.apache.cxf.message.Message;
  * </ol>
  */
 public class ClientCallback implements Future<Object[]> {
-
+    protected final CompletableFuture<Object[]> delegate = new CompletableFuture<Object[]>();
     protected Map<String, Object> context;
-    protected Object[] result;
-    protected Throwable exception;
-    protected volatile boolean done;
-    protected boolean cancelled;
     protected boolean started;
 
     public ClientCallback() {
@@ -72,8 +70,8 @@ public class ClientCallback implements Future<Object[]> {
      */
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
-        done = true;
+        delegate.complete(res);
+
         synchronized (this) {
             notifyAll();
         }
@@ -91,8 +89,8 @@ public class ClientCallback implements Future<Object[]> {
      */
     public void handleException(Map<String, Object> ctx, Throwable ex) {
         context = ctx;
-        exception = ex;
-        done = true;
+        delegate.completeExceptionally(ex);
+        
         synchronized (this) {
             notifyAll();
         }
@@ -101,10 +99,12 @@ public class ClientCallback implements Future<Object[]> {
 
     public boolean cancel(boolean mayInterruptIfRunning) {
         if (!started) {
-            cancelled = true;
+            delegate.cancel(mayInterruptIfRunning);
+            
             synchronized (this) {
                 notifyAll();
             }
+            
             return true;
         }
         return false;
@@ -118,15 +118,15 @@ public class ClientCallback implements Future<Object[]> {
      */
     public Map<String, Object> getResponseContext() throws InterruptedException, ExecutionException {
         synchronized (this) {
-            if (!done) {
+            if (!delegate.isDone()) {
                 wait();
             }
         }
-        if (cancelled) {
+        if (delegate.isCancelled()) {
             throw new InterruptedException("Operation Cancelled");
         }
-        if (exception != null) {
-            throw new ExecutionException(exception);
+        if (delegate.isCompletedExceptionally()) {
+            delegate.get();
         }
         return context;
     }
@@ -135,55 +135,32 @@ public class ClientCallback implements Future<Object[]> {
      * {@inheritDoc}
      */
     public Object[] get() throws InterruptedException, ExecutionException {
-        synchronized (this) {
-            if (!done) {
-                wait();
-            }
+        try {
+            return delegate.get();
+        } catch (final CancellationException ex) {
+            // Preserving the exception raised by former implementation
+            throw new InterruptedException("Operation has been cancelled");
         }
-        if (cancelled) {
-            throw new InterruptedException("Operation Cancelled");
-        }
-        if (exception != null) {
-            throw new ExecutionException(exception);
-        }
-        return result;
     }
 
     /**
      * {@inheritDoc}
      */
-    public Object[] get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        synchronized (this) {
-            if (!done) {
-                unit.timedWait(this, timeout);
-            }
-        }
-        if (cancelled) {
-            throw new InterruptedException("Operation Cancelled");
-        }
-        if (!done) {
-            throw new TimeoutException("Timeout Exceeded");
-        }
-        if (exception != null) {
-            throw new ExecutionException(exception);
+    public Object[] get(long timeout, TimeUnit unit) throws InterruptedException, 
+            ExecutionException, TimeoutException {
+        try {
+            return delegate.get(timeout, unit);
+        } catch (final CancellationException ex) {
+            // Preserving the exception raised by former implementation
+            throw new InterruptedException("Operation has been cancelled");
         }
-        return result;
     }
 
     public boolean isCancelled() {
-        return cancelled;
+        return delegate.isCancelled();
     }
 
     public boolean isDone() {
-        return done;
+        return delegate.isDone();
     }
-
-    /*
-     * If the operation completes with a fault, the resulting exception object ends up here.
-     */
-    public Throwable getException() {
-        return exception;
-    }
-
 }
diff --git a/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
new file mode 100644
index 0000000..2a3f74a
--- /dev/null
+++ b/core/src/test/java/org/apache/cxf/endpoint/ClientCallbackTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientCallbackTest {
+    private Map<String, Object> ctx;
+    private ClientCallback callback;
+    private ScheduledExecutorService executor;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        callback = new ClientCallback();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThrows(ExecutionException.class, () -> callback.get());
+        assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.cancel(true));
+
+        synchronized (callback) {
+            barrier.await(5, TimeUnit.SECONDS);
+            callback.wait();
+        }
+
+        assertThrows(InterruptedException.class, () -> callback.get());
+        assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(callback.cancel(true), equalTo(false));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.cancel(true));
+
+        assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        callback.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
index ceb1450..28b4c24 100644
--- a/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
+++ b/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java
@@ -39,10 +39,12 @@ class JaxwsClientCallback<T> extends ClientCallback {
     }
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
+        delegate.complete(res);
+        
         if (handler != null) {
             handler.handleResponse(new Response<T>() {
-
+                protected boolean cancelled;
+                
                 public Map<String, Object> getContext() {
                     return context;
                 }
@@ -54,13 +56,13 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
                 @SuppressWarnings("unchecked")
                 public T get() throws InterruptedException, ExecutionException {
-                    return (T)result[0];
+                    return (T)res[0];
                 }
 
                 @SuppressWarnings("unchecked")
                 public T get(long timeout, TimeUnit unit) throws InterruptedException,
                     ExecutionException, TimeoutException {
-                    return (T)result[0];
+                    return (T)res[0];
                 }
 
                 public boolean isCancelled() {
@@ -73,7 +75,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
             });
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
@@ -82,9 +84,10 @@ class JaxwsClientCallback<T> extends ClientCallback {
     @Override
     public void handleException(Map<String, Object> ctx, final Throwable ex) {
         context = ctx;
-        exception = mapThrowable(ex);
+        delegate.completeExceptionally(mapThrowable(ex));
         if (handler != null) {
             handler.handleResponse(new Response<T>() {
+                protected boolean cancelled;
 
                 public Map<String, Object> getContext() {
                     return context;
@@ -115,7 +118,7 @@ class JaxwsClientCallback<T> extends ClientCallback {
 
             });
         }
-        done = true;
+
         synchronized (this) {
             notifyAll();
         }
diff --git a/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
new file mode 100644
index 0000000..1e86703
--- /dev/null
+++ b/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JaxwsClientCallbackTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxws;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxwsClientCallbackTest {
+    private Map<String, Object> ctx;
+    private JaxwsClientCallback<String> callback;
+    private AsyncHandler<String> handler;
+    private ScheduledExecutorService executor;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new AsyncHandler<String>() {
+            @Override
+            public void handleResponse(Response<String> res) {
+            }
+        };
+        
+        callback = new JaxwsClientCallback<String>(handler, null);
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new Object[0];
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(callback.get(), equalTo(result));
+        assertThat(callback.get(10, TimeUnit.MILLISECONDS), equalTo(result));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.get());
+        assertThrows(ExecutionException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> callback.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(InterruptedException.class, () -> callback.get());
+        assertThrows(InterruptedException.class, () -> callback.get(10, TimeUnit.MILLISECONDS));
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(callback.cancel(true), equalTo(false));
+        assertThat(callback.isCancelled(), equalTo(false));
+        assertThat(callback.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.cancel(true));
+
+        assertThrows(InterruptedException.class, () -> callback.getResponseContext());
+        assertThat(callback.isCancelled(), equalTo(true));
+        assertThat(callback.isDone(), equalTo(true));
+    }
+    
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        callback.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
index 59de875..e48a449 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java
@@ -921,7 +921,7 @@ public class ClientProxyImpl extends AbstractClient implements
                                    InvocationCallback<Object> asyncCallback) {
         outMessage.getExchange().setSynchronous(false);
         setAsyncMessageObserverIfNeeded(outMessage.getExchange());
-        JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback,
+        JaxrsClientCallback<?> cb = newJaxrsClientCallback(asyncCallback, outMessage,
             ori.getMethodToInvoke().getReturnType(), ori.getMethodToInvoke().getGenericReturnType());
         outMessage.getExchange().put(JaxrsClientCallback.class, cb);
         doRunInterceptorChain(outMessage);
@@ -930,6 +930,7 @@ public class ClientProxyImpl extends AbstractClient implements
     }
 
     protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+                                                            Message outMessage,
                                                             Class<?> responseClass,
                                                             Type outGenericType) {
         return new JaxrsClientCallback<>(asyncCallback, responseClass, outGenericType);
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index 98443c5..5db63bc 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -71,11 +71,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
     @SuppressWarnings("unchecked")
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
-        result = res;
+        delegate.complete(res);
         if (handler != null) {
             handler.completed((T)res[0]);
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
@@ -84,11 +84,11 @@ public class JaxrsClientCallback<T> extends ClientCallback {
     @Override
     public void handleException(Map<String, Object> ctx, final Throwable ex) {
         context = ctx;
-        exception = ex;
+        delegate.completeExceptionally(ex);
         if (handler != null) {
-            handler.failed(exception);
+            handler.failed(ex);
         }
-        done = true;
+        
         synchronized (this) {
             notifyAll();
         }
diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
new file mode 100644
index 0000000..4f1b124
--- /dev/null
+++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/JaxrsClientCallbackTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.jaxrs.client.JaxrsClientCallback.JaxrsResponseFuture;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class JaxrsClientCallbackTest {
+    private Map<String, Object> ctx;
+    private JaxrsClientCallback<String> callback;
+    private InvocationCallback<String> handler;
+    private ScheduledExecutorService executor;
+    private JaxrsResponseFuture<String> future;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new InvocationCallback<String>() {
+            @Override
+            public void failed(Throwable throwable) {
+            }
+            
+            @Override
+            public void completed(String response) {
+            }
+        };
+        
+        callback = new JaxrsClientCallback<String>(handler, String.class, null);
+        future = (JaxrsResponseFuture<String>)callback.createFuture();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.getContext(), equalTo(ctx));
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> future.get());
+        assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThat(future.getContext(), nullValue());
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> future.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(InterruptedException.class, () -> future.get());
+        assertThrows(InterruptedException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(new MessageImpl());
+        assertThat(future.cancel(true), equalTo(false));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testGetResponseContextOnCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> future.cancel(true));
+
+        assertThat(future.getContext(), nullValue());
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        future.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
index a9c8857..a37dde9 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java
@@ -20,81 +20,37 @@
 package org.apache.cxf.microprofile.client;
 
 import java.lang.reflect.Type;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.InvocationCallback;
 
 import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
+import org.apache.cxf.message.Message;
 
 public class MPRestClientCallback<T> extends JaxrsClientCallback<T> {
+    private final ExecutorService executor;
 
     public MPRestClientCallback(InvocationCallback<T> handler,
+                                Message outMessage,
                                 Class<?> responseClass,
                                 Type outGenericType) {
         super(handler, responseClass, outGenericType);
+        ExecutorService es = outMessage.get(ExecutorService.class);
+        if (es == null) {
+            es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> {
+                return ForkJoinPool.commonPool();
+            });
+        }
+        executor = es;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public Future<T> createFuture() {
-        return new MPRestClientResponseFuture<T>(this);
-    }
-
-    static class MPRestClientResponseFuture<T> extends CompletableFuture<T> implements Future<T> {
-        MPRestClientCallback<T> callback;
-        MPRestClientResponseFuture(MPRestClientCallback<T> cb) {
-            callback = cb;
-        }
-
-        public Map<String, Object> getContext() {
-            try {
-                return callback.getResponseContext();
-            } catch (Exception ex) {
-                return null;
-            }
-        }
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            return callback.cancel(mayInterruptIfRunning);
-        }
-
-        public T get() throws InterruptedException, ExecutionException {
-            try {
-                return getObject(callback.get()[0]);
-            } catch (InterruptedException ex) {
-                InvocationCallback<T> handler = callback.getHandler();
-                if (handler != null) {
-                    handler.failed(ex);
-                }
-                throw ex;
-            }
-        }
-        public T get(long timeout, TimeUnit unit) throws InterruptedException,
-            ExecutionException, TimeoutException {
-            try {
-                return getObject(callback.get(timeout, unit)[0]);
-            } catch (InterruptedException ex) {
-                InvocationCallback<T> handler = callback.getHandler();
-                if (handler != null) {
-                    handler.failed(ex);
-                }
-                throw ex;
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        private T getObject(Object object) {
-            return (T)object;
-        }
-
-        public boolean isCancelled() {
-            return callback.isCancelled();
-        }
-        public boolean isDone() {
-            return callback.isDone();
-        }
+        return delegate.thenApplyAsync(res -> (T)res[0], executor);
     }
 }
\ No newline at end of file
diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
index 46dee88..2bd02ee 100644
--- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
+++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java
@@ -111,9 +111,10 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl {
 
     @Override
     protected JaxrsClientCallback<?> newJaxrsClientCallback(InvocationCallback<Object> asyncCallback,
+                                                            Message outMessage,
                                                             Class<?> responseClass,
                                                             Type outGenericType) {
-        return new MPRestClientCallback<Object>(asyncCallback, responseClass, outGenericType);
+        return new MPRestClientCallback<Object>(asyncCallback, outMessage, responseClass, outGenericType);
     }
 
     @Override
diff --git a/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
new file mode 100644
index 0000000..ad36f1e
--- /dev/null
+++ b/rt/rs/microprofile-client/src/test/java/org/apache/cxf/microprofile/client/MPRestClientCallbackTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.microprofile.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class MPRestClientCallbackTest {
+    private Map<String, Object> ctx;
+    private MPRestClientCallback<String> callback;
+    private InvocationCallback<String> handler;
+    private ScheduledExecutorService executor;
+    private CompletableFuture<String> future;
+    private Message message;
+    
+    @Before
+    public void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        handler = new InvocationCallback<String>() {
+            @Override
+            public void failed(Throwable throwable) {
+            }
+            
+            @Override
+            public void completed(String response) {
+            }
+        };
+        
+        message = new MessageImpl();
+        callback = new MPRestClientCallback<String>(handler, message, String.class, null);
+        future = (CompletableFuture<String>)callback.createFuture();
+        ctx = new HashMap<String, Object>();
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    
+    @Test
+    public void testHandleResponseCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testGetResponseContextOnSuccessCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Object[] result = new String[] {"results"};
+        schedule(barrier, () -> callback.handleResponse(ctx, result));
+        barrier.await(5, TimeUnit.SECONDS);
+        
+        assertThat(callback.getResponseContext(), equalTo(ctx));
+        assertThat(future.get(), equalTo("results"));
+        assertThat(future.get(10, TimeUnit.MILLISECONDS), equalTo("results"));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+    
+    @Test
+    public void testHandleExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> future.get());
+        assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testGetResponseContextOnExceptionCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(1);
+        schedule(barrier, () -> callback.handleException(ctx, new RuntimeException()));
+
+        assertThrows(ExecutionException.class, () -> callback.getResponseContext());
+        assertThat(future.isCancelled(), equalTo(false));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallback() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        schedule(barrier, () -> future.cancel(true));
+        barrier.await(5, TimeUnit.SECONDS);
+
+        assertThrows(CancellationException.class, () -> future.get());
+        assertThrows(CancellationException.class, () -> future.get(10, TimeUnit.MILLISECONDS));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test
+    public void testHandleCancellationCallbackWhenStarted() throws Exception {
+        callback.start(message);
+        assertThat(future.cancel(true), equalTo(true));
+        assertThat(future.isCancelled(), equalTo(true));
+        assertThat(future.isDone(), equalTo(true));
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testTimeout() throws Exception {
+        future.get(10, TimeUnit.MILLISECONDS);
+    }
+
+    private void schedule(CyclicBarrier barrier, Runnable runnable) {
+        executor.schedule(() -> {
+            barrier.await(5, TimeUnit.SECONDS);
+            runnable.run();
+            return null;
+        }, 100, TimeUnit.MILLISECONDS);
+    }
+}


[cxf] 02/02: Recording .gitmergeinfo Changes

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 0166c963080975b64e38f43f0852beb1a8ae1cdd
Author: reta <dr...@gmail.com>
AuthorDate: Sun Mar 29 11:07:20 2020 -0400

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index c78b1bc..8a6e94d 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -934,6 +934,7 @@ M 5cfbdfeb59581514f3b2773abdca57728e19fdad
 M 5f6c4497f03cbd8a59d0732c9ea329891ef38f49
 M 68a248a5ce0d8db6d991222c110a161764aedf9d
 M 6b121380f908030dd9829615deb507c2e0fa2a08
+M 6b602d3c267c5c73b2bced1c5a8368f0d6a06da1
 M 6b8b33acde5eb94ca84121591c68eb3e5e7b294f
 M 7233ba20684c3e8f20bf363647c028b2f406f08c
 M 73a8be32b5e143dcf1908bd4e73249c116b14f7c