You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by co...@apache.org on 2018/10/23 10:58:55 UTC

[cxf] 01/03: CXF-7881: Ensure proper allowCurrentThread behavior

This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 6c2ef56e567b70a36761f27853eecef6bf20af00
Author: Andy McCright <j....@gmail.com>
AuthorDate: Mon Oct 22 17:00:23 2018 -0500

    CXF-7881: Ensure proper allowCurrentThread behavior
    
    In the case where the current Executor throws a
    RejectedExecutionException:
    
    This should ensure that if allowCurrentThread is false OR the client
    policy has setAsyncExecuteTimeoutRejection, then the the caller should
    receive a RejectedExecutionException.
    
    If allowCurrentThread is true, then current thread will handle the
    async response.
    
    (cherry picked from commit 326c40dd52d857d4b1a33d18b465f0a1580044e8)
---
 .../org/apache/cxf/transport/http/HTTPConduit.java |   6 +-
 .../apache/cxf/transport/http/HTTPConduitTest.java |  55 ++++++-
 .../apache/cxf/transport/http/MockHTTPConduit.java | 163 +++++++++++++++++++++
 3 files changed, 220 insertions(+), 4 deletions(-)

diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index e62f594..2bcc8bd 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1242,10 +1242,10 @@ public abstract class HTTPConduit
                         ex.execute(runnable);
                     }
                 } catch (RejectedExecutionException rex) {
-                    if (allowCurrentThread
-                        && policy != null
+                    if (!allowCurrentThread
+                        || (policy != null
                         && policy.isSetAsyncExecuteTimeoutRejection()
-                        && policy.isAsyncExecuteTimeoutRejection()) {
+                        && policy.isAsyncExecuteTimeoutRejection())) {
                         throw rex;
                     }
                     if (!hasLoggedAsyncWarning) {
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
index 343fb95..848e90a 100644
--- a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
@@ -26,16 +26,24 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.ExtensionManagerBus;
 import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit.WrappedOutputStream;
 import org.apache.cxf.transport.http.auth.HttpAuthSupplier;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 
@@ -58,7 +66,7 @@ public class HTTPConduitTest extends Assert {
     /**
      * Generates a new message.
      */
-    private Message getNewMessage() {
+    private Message getNewMessage() throws Exception {
         Message message = new MessageImpl();
         Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
         List<String> contentTypes = new ArrayList<>();
@@ -228,4 +236,49 @@ public class HTTPConduitTest extends Assert {
     }
 
 
+    @Test
+    public void testHandleResponseOnWorkqueueAllowCurrentThread() throws Exception {
+        Message m = getNewMessage();
+        Exchange exchange = new ExchangeImpl();
+        Bus bus = new ExtensionManagerBus();
+        exchange.put(Bus.class, bus);
+
+        EndpointInfo endpointInfo = new EndpointInfo();
+        Endpoint endpoint = new EndpointImpl(null, null, endpointInfo);
+        exchange.put(Endpoint.class, endpoint);
+
+        m.setExchange(exchange);
+
+        HTTPClientPolicy policy = new HTTPClientPolicy();
+        policy.setAsyncExecuteTimeoutRejection(true);
+        m.put(HTTPClientPolicy.class, policy);
+        exchange.put(Executor.class, new Executor() {
+
+            @Override
+            public void execute(Runnable command) {
+                // simulates a maxxed-out executor
+                // forces us to use current thread
+                throw new RejectedExecutionException("expected");
+            } });
+
+        HTTPConduit conduit = new MockHTTPConduit(bus, endpointInfo, policy);
+        OutputStream os = conduit.createOutputStream(m, false, false, 0);
+        assertTrue(os instanceof WrappedOutputStream);
+        WrappedOutputStream wos = (WrappedOutputStream) os;
+
+        try {
+            wos.handleResponseOnWorkqueue(true, false);
+            assertEquals(Thread.currentThread(), m.get(Thread.class));
+
+            try {
+                wos.handleResponseOnWorkqueue(false, false);
+                fail("Expected RejectedExecutionException not thrown");
+            } catch (RejectedExecutionException ex) {
+                assertEquals("expected", ex.getMessage());
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw ex;
+        }
+    }
 }
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
new file mode 100644
index 0000000..94b5d12
--- /dev/null
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+public class MockHTTPConduit extends HTTPConduit {
+
+    public MockHTTPConduit(Bus b, EndpointInfo ei, HTTPClientPolicy policy) throws IOException {
+        super(b, ei);
+        setClient(policy);
+    }
+
+    @Override
+    protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy)
+        throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
+                                              int chunkThreshold)
+        throws IOException {
+        return new MockWrappedOutputStream(message, isChunking, isChunking, chunkThreshold, "mockConduit", null);
+    }
+
+    class MockWrappedOutputStream extends WrappedOutputStream {
+
+        protected MockWrappedOutputStream(Message outMessage, boolean possibleRetransmit, boolean isChunking,
+                                          int chunkThreshold, String conduitName, URI url) {
+            super(outMessage, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
+        }
+
+        @Override
+        protected void setupWrappedStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setProtocolHeaders() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void setFixedLengthStreamingMode(int i) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected int getResponseCode() throws IOException {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        protected String getResponseMessage() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void updateResponseHeaders(Message inMessage) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseAsync() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseInternal() throws IOException {
+            outMessage.put(Thread.class, Thread.currentThread());
+        }
+
+        @Override
+        protected void closeInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected boolean usingProxy() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        protected InputStream getInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected InputStream getPartialResponse() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setupNewConnection(String newURL) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void retransmitStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void updateCookiesBeforeRetransmit() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void thresholdReached() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+}