You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by co...@apache.org on 2018/10/23 10:58:55 UTC
[cxf] 01/03: CXF-7881: Ensure proper allowCurrentThread behavior
This is an automated email from the ASF dual-hosted git repository.
coheigea pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 6c2ef56e567b70a36761f27853eecef6bf20af00
Author: Andy McCright <j....@gmail.com>
AuthorDate: Mon Oct 22 17:00:23 2018 -0500
CXF-7881: Ensure proper allowCurrentThread behavior
In the case where the current Executor throws a
RejectedExecutionException:
This should ensure that if allowCurrentThread is false OR the client
policy has setAsyncExecuteTimeoutRejection, then the the caller should
receive a RejectedExecutionException.
If allowCurrentThread is true, then current thread will handle the
async response.
(cherry picked from commit 326c40dd52d857d4b1a33d18b465f0a1580044e8)
---
.../org/apache/cxf/transport/http/HTTPConduit.java | 6 +-
.../apache/cxf/transport/http/HTTPConduitTest.java | 55 ++++++-
.../apache/cxf/transport/http/MockHTTPConduit.java | 163 +++++++++++++++++++++
3 files changed, 220 insertions(+), 4 deletions(-)
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index e62f594..2bcc8bd 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1242,10 +1242,10 @@ public abstract class HTTPConduit
ex.execute(runnable);
}
} catch (RejectedExecutionException rex) {
- if (allowCurrentThread
- && policy != null
+ if (!allowCurrentThread
+ || (policy != null
&& policy.isSetAsyncExecuteTimeoutRejection()
- && policy.isAsyncExecuteTimeoutRejection()) {
+ && policy.isAsyncExecuteTimeoutRejection())) {
throw rex;
}
if (!hasLoggedAsyncWarning) {
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
index 343fb95..848e90a 100644
--- a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
@@ -26,16 +26,24 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.cxf.Bus;
import org.apache.cxf.bus.extension.ExtensionManagerBus;
import org.apache.cxf.common.util.Base64Utility;
import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit.WrappedOutputStream;
import org.apache.cxf.transport.http.auth.HttpAuthSupplier;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
@@ -58,7 +66,7 @@ public class HTTPConduitTest extends Assert {
/**
* Generates a new message.
*/
- private Message getNewMessage() {
+ private Message getNewMessage() throws Exception {
Message message = new MessageImpl();
Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
List<String> contentTypes = new ArrayList<>();
@@ -228,4 +236,49 @@ public class HTTPConduitTest extends Assert {
}
+ @Test
+ public void testHandleResponseOnWorkqueueAllowCurrentThread() throws Exception {
+ Message m = getNewMessage();
+ Exchange exchange = new ExchangeImpl();
+ Bus bus = new ExtensionManagerBus();
+ exchange.put(Bus.class, bus);
+
+ EndpointInfo endpointInfo = new EndpointInfo();
+ Endpoint endpoint = new EndpointImpl(null, null, endpointInfo);
+ exchange.put(Endpoint.class, endpoint);
+
+ m.setExchange(exchange);
+
+ HTTPClientPolicy policy = new HTTPClientPolicy();
+ policy.setAsyncExecuteTimeoutRejection(true);
+ m.put(HTTPClientPolicy.class, policy);
+ exchange.put(Executor.class, new Executor() {
+
+ @Override
+ public void execute(Runnable command) {
+ // simulates a maxxed-out executor
+ // forces us to use current thread
+ throw new RejectedExecutionException("expected");
+ } });
+
+ HTTPConduit conduit = new MockHTTPConduit(bus, endpointInfo, policy);
+ OutputStream os = conduit.createOutputStream(m, false, false, 0);
+ assertTrue(os instanceof WrappedOutputStream);
+ WrappedOutputStream wos = (WrappedOutputStream) os;
+
+ try {
+ wos.handleResponseOnWorkqueue(true, false);
+ assertEquals(Thread.currentThread(), m.get(Thread.class));
+
+ try {
+ wos.handleResponseOnWorkqueue(false, false);
+ fail("Expected RejectedExecutionException not thrown");
+ } catch (RejectedExecutionException ex) {
+ assertEquals("expected", ex.getMessage());
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw ex;
+ }
+ }
}
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
new file mode 100644
index 0000000..94b5d12
--- /dev/null
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+public class MockHTTPConduit extends HTTPConduit {
+
+ public MockHTTPConduit(Bus b, EndpointInfo ei, HTTPClientPolicy policy) throws IOException {
+ super(b, ei);
+ setClient(policy);
+ }
+
+ @Override
+ protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
+ int chunkThreshold)
+ throws IOException {
+ return new MockWrappedOutputStream(message, isChunking, isChunking, chunkThreshold, "mockConduit", null);
+ }
+
+ class MockWrappedOutputStream extends WrappedOutputStream {
+
+ protected MockWrappedOutputStream(Message outMessage, boolean possibleRetransmit, boolean isChunking,
+ int chunkThreshold, String conduitName, URI url) {
+ super(outMessage, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
+ }
+
+ @Override
+ protected void setupWrappedStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void setProtocolHeaders() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void setFixedLengthStreamingMode(int i) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected int getResponseCode() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ protected String getResponseMessage() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void updateResponseHeaders(Message inMessage) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void handleResponseAsync() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void handleResponseInternal() throws IOException {
+ outMessage.put(Thread.class, Thread.currentThread());
+ }
+
+ @Override
+ protected void closeInputStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected boolean usingProxy() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ protected InputStream getInputStream() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected InputStream getPartialResponse() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void setupNewConnection(String newURL) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void retransmitStream() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void updateCookiesBeforeRetransmit() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void thresholdReached() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}