You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by GitBox <gi...@apache.org> on 2018/10/23 10:57:44 UTC

[GitHub] coheigea closed pull request #463: CXF-7881: Ensure proper allowCurrentThread behavior

coheigea closed pull request #463: CXF-7881: Ensure proper allowCurrentThread behavior
URL: https://github.com/apache/cxf/pull/463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index e62f59477ca..2bcc8bde272 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1242,10 +1242,10 @@ public void run() {
                         ex.execute(runnable);
                     }
                 } catch (RejectedExecutionException rex) {
-                    if (allowCurrentThread
-                        && policy != null
+                    if (!allowCurrentThread
+                        || (policy != null
                         && policy.isSetAsyncExecuteTimeoutRejection()
-                        && policy.isAsyncExecuteTimeoutRejection()) {
+                        && policy.isAsyncExecuteTimeoutRejection())) {
                         throw rex;
                     }
                     if (!hasLoggedAsyncWarning) {
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
index 343fb954c21..848e90a8e2c 100644
--- a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
@@ -26,16 +26,24 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.ExtensionManagerBus;
 import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit.WrappedOutputStream;
 import org.apache.cxf.transport.http.auth.HttpAuthSupplier;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 
@@ -58,7 +66,7 @@ public void tearDown() {
     /**
      * Generates a new message.
      */
-    private Message getNewMessage() {
+    private Message getNewMessage() throws Exception {
         Message message = new MessageImpl();
         Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
         List<String> contentTypes = new ArrayList<>();
@@ -228,4 +236,49 @@ public void testAuthPolicyPrecedence() throws Exception {
     }
 
 
+    @Test
+    public void testHandleResponseOnWorkqueueAllowCurrentThread() throws Exception {
+        Message m = getNewMessage();
+        Exchange exchange = new ExchangeImpl();
+        Bus bus = new ExtensionManagerBus();
+        exchange.put(Bus.class, bus);
+
+        EndpointInfo endpointInfo = new EndpointInfo();
+        Endpoint endpoint = new EndpointImpl(null, null, endpointInfo);
+        exchange.put(Endpoint.class, endpoint);
+
+        m.setExchange(exchange);
+
+        HTTPClientPolicy policy = new HTTPClientPolicy();
+        policy.setAsyncExecuteTimeoutRejection(true);
+        m.put(HTTPClientPolicy.class, policy);
+        exchange.put(Executor.class, new Executor() {
+
+            @Override
+            public void execute(Runnable command) {
+                // simulates a maxxed-out executor
+                // forces us to use current thread
+                throw new RejectedExecutionException("expected");
+            } });
+
+        HTTPConduit conduit = new MockHTTPConduit(bus, endpointInfo, policy);
+        OutputStream os = conduit.createOutputStream(m, false, false, 0);
+        assertTrue(os instanceof WrappedOutputStream);
+        WrappedOutputStream wos = (WrappedOutputStream) os;
+
+        try {
+            wos.handleResponseOnWorkqueue(true, false);
+            assertEquals(Thread.currentThread(), m.get(Thread.class));
+
+            try {
+                wos.handleResponseOnWorkqueue(false, false);
+                fail("Expected RejectedExecutionException not thrown");
+            } catch (RejectedExecutionException ex) {
+                assertEquals("expected", ex.getMessage());
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw ex;
+        }
+    }
 }
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
new file mode 100644
index 00000000000..94b5d120f83
--- /dev/null
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+public class MockHTTPConduit extends HTTPConduit {
+
+    public MockHTTPConduit(Bus b, EndpointInfo ei, HTTPClientPolicy policy) throws IOException {
+        super(b, ei);
+        setClient(policy);
+    }
+
+    @Override
+    protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy)
+        throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
+                                              int chunkThreshold)
+        throws IOException {
+        return new MockWrappedOutputStream(message, isChunking, isChunking, chunkThreshold, "mockConduit", null);
+    }
+
+    class MockWrappedOutputStream extends WrappedOutputStream {
+
+        protected MockWrappedOutputStream(Message outMessage, boolean possibleRetransmit, boolean isChunking,
+                                          int chunkThreshold, String conduitName, URI url) {
+            super(outMessage, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
+        }
+
+        @Override
+        protected void setupWrappedStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setProtocolHeaders() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void setFixedLengthStreamingMode(int i) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected int getResponseCode() throws IOException {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        protected String getResponseMessage() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void updateResponseHeaders(Message inMessage) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseAsync() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseInternal() throws IOException {
+            outMessage.put(Thread.class, Thread.currentThread());
+        }
+
+        @Override
+        protected void closeInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected boolean usingProxy() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        protected InputStream getInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected InputStream getPartialResponse() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setupNewConnection(String newURL) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void retransmitStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void updateCookiesBeforeRetransmit() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void thresholdReached() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services