You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by co...@apache.org on 2018/10/23 10:58:54 UTC

[cxf] branch 3.2.x-fixes updated (2c71230 -> 93a673d)

This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a change to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 2c71230  Recording .gitmergeinfo Changes
     new 6c2ef56  CXF-7881: Ensure proper allowCurrentThread behavior
     new 0cbb711  CXF-7882 - FailoverTargetSelector uses the Exchange as the key in the inProgress map
     new 93a673d  Recording .gitmergeinfo Changes

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   4 +
 .../clustering/CircuitBreakerTargetSelector.java   |   2 +-
 .../cxf/clustering/FailoverTargetSelector.java     |  14 +-
 .../clustering/LoadDistributorTargetSelector.java  |   2 +-
 .../org/apache/cxf/transport/http/HTTPConduit.java |   6 +-
 .../apache/cxf/transport/http/HTTPConduitTest.java |  55 ++++++-
 .../apache/cxf/transport/http/MockHTTPConduit.java | 163 +++++++++++++++++++++
 7 files changed, 234 insertions(+), 12 deletions(-)
 create mode 100644 rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java


[cxf] 01/03: CXF-7881: Ensure proper allowCurrentThread behavior

Posted by co...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 6c2ef56e567b70a36761f27853eecef6bf20af00
Author: Andy McCright <j....@gmail.com>
AuthorDate: Mon Oct 22 17:00:23 2018 -0500

    CXF-7881: Ensure proper allowCurrentThread behavior
    
    In the case where the current Executor throws a
    RejectedExecutionException:
    
    This should ensure that if allowCurrentThread is false OR the client
    policy has setAsyncExecuteTimeoutRejection, then the the caller should
    receive a RejectedExecutionException.
    
    If allowCurrentThread is true, then current thread will handle the
    async response.
    
    (cherry picked from commit 326c40dd52d857d4b1a33d18b465f0a1580044e8)
---
 .../org/apache/cxf/transport/http/HTTPConduit.java |   6 +-
 .../apache/cxf/transport/http/HTTPConduitTest.java |  55 ++++++-
 .../apache/cxf/transport/http/MockHTTPConduit.java | 163 +++++++++++++++++++++
 3 files changed, 220 insertions(+), 4 deletions(-)

diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index e62f594..2bcc8bd 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1242,10 +1242,10 @@ public abstract class HTTPConduit
                         ex.execute(runnable);
                     }
                 } catch (RejectedExecutionException rex) {
-                    if (allowCurrentThread
-                        && policy != null
+                    if (!allowCurrentThread
+                        || (policy != null
                         && policy.isSetAsyncExecuteTimeoutRejection()
-                        && policy.isAsyncExecuteTimeoutRejection()) {
+                        && policy.isAsyncExecuteTimeoutRejection())) {
                         throw rex;
                     }
                     if (!hasLoggedAsyncWarning) {
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
index 343fb95..848e90a 100644
--- a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
@@ -26,16 +26,24 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.ExtensionManagerBus;
 import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit.WrappedOutputStream;
 import org.apache.cxf.transport.http.auth.HttpAuthSupplier;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 
@@ -58,7 +66,7 @@ public class HTTPConduitTest extends Assert {
     /**
      * Generates a new message.
      */
-    private Message getNewMessage() {
+    private Message getNewMessage() throws Exception {
         Message message = new MessageImpl();
         Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
         List<String> contentTypes = new ArrayList<>();
@@ -228,4 +236,49 @@ public class HTTPConduitTest extends Assert {
     }
 
 
+    @Test
+    public void testHandleResponseOnWorkqueueAllowCurrentThread() throws Exception {
+        Message m = getNewMessage();
+        Exchange exchange = new ExchangeImpl();
+        Bus bus = new ExtensionManagerBus();
+        exchange.put(Bus.class, bus);
+
+        EndpointInfo endpointInfo = new EndpointInfo();
+        Endpoint endpoint = new EndpointImpl(null, null, endpointInfo);
+        exchange.put(Endpoint.class, endpoint);
+
+        m.setExchange(exchange);
+
+        HTTPClientPolicy policy = new HTTPClientPolicy();
+        policy.setAsyncExecuteTimeoutRejection(true);
+        m.put(HTTPClientPolicy.class, policy);
+        exchange.put(Executor.class, new Executor() {
+
+            @Override
+            public void execute(Runnable command) {
+                // simulates a maxxed-out executor
+                // forces us to use current thread
+                throw new RejectedExecutionException("expected");
+            } });
+
+        HTTPConduit conduit = new MockHTTPConduit(bus, endpointInfo, policy);
+        OutputStream os = conduit.createOutputStream(m, false, false, 0);
+        assertTrue(os instanceof WrappedOutputStream);
+        WrappedOutputStream wos = (WrappedOutputStream) os;
+
+        try {
+            wos.handleResponseOnWorkqueue(true, false);
+            assertEquals(Thread.currentThread(), m.get(Thread.class));
+
+            try {
+                wos.handleResponseOnWorkqueue(false, false);
+                fail("Expected RejectedExecutionException not thrown");
+            } catch (RejectedExecutionException ex) {
+                assertEquals("expected", ex.getMessage());
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw ex;
+        }
+    }
 }
diff --git a/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
new file mode 100644
index 0000000..94b5d12
--- /dev/null
+++ b/rt/transports/http/src/test/java/org/apache/cxf/transport/http/MockHTTPConduit.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+public class MockHTTPConduit extends HTTPConduit {
+
+    public MockHTTPConduit(Bus b, EndpointInfo ei, HTTPClientPolicy policy) throws IOException {
+        super(b, ei);
+        setClient(policy);
+    }
+
+    @Override
+    protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy)
+        throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
+                                              int chunkThreshold)
+        throws IOException {
+        return new MockWrappedOutputStream(message, isChunking, isChunking, chunkThreshold, "mockConduit", null);
+    }
+
+    class MockWrappedOutputStream extends WrappedOutputStream {
+
+        protected MockWrappedOutputStream(Message outMessage, boolean possibleRetransmit, boolean isChunking,
+                                          int chunkThreshold, String conduitName, URI url) {
+            super(outMessage, possibleRetransmit, isChunking, chunkThreshold, conduitName, url);
+        }
+
+        @Override
+        protected void setupWrappedStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setProtocolHeaders() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void setFixedLengthStreamingMode(int i) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected int getResponseCode() throws IOException {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
+        @Override
+        protected String getResponseMessage() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void updateResponseHeaders(Message inMessage) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseAsync() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void handleResponseInternal() throws IOException {
+            outMessage.put(Thread.class, Thread.currentThread());
+        }
+
+        @Override
+        protected void closeInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected boolean usingProxy() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        protected InputStream getInputStream() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected InputStream getPartialResponse() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        protected void setupNewConnection(String newURL) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void retransmitStream() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        protected void updateCookiesBeforeRetransmit() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void thresholdReached() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+}


[cxf] 02/03: CXF-7882 - FailoverTargetSelector uses the Exchange as the key in the inProgress map

Posted by co...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 0cbb7111a5da81b9c756d0445bd950ea72d6aa9f
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Tue Oct 23 10:36:36 2018 +0100

    CXF-7882 - FailoverTargetSelector uses the Exchange as the key in the inProgress map
    
    (cherry picked from commit 0ae26aa4cdad1da742645c61c9f71eda48fd6169)
---
 .../cxf/clustering/CircuitBreakerTargetSelector.java       |  2 +-
 .../org/apache/cxf/clustering/FailoverTargetSelector.java  | 14 ++++++++------
 .../cxf/clustering/LoadDistributorTargetSelector.java      |  2 +-
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
index 3010351..17e30f6 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
@@ -117,7 +117,7 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
         }
         Exchange exchange = message.getExchange();
         InvocationKey key = new InvocationKey(exchange);
-        InvocationContext invocation = inProgress.get(key);
+        InvocationContext invocation = getInvocationContext(key);
         if (invocation != null && !invocation.getContext().containsKey(IS_SELECTED)) {
             final String address = (String) message.get(Message.ENDPOINT_ADDRESS);
 
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
index d468ae1..8b0d532 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
@@ -51,8 +51,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
     private static final String COMPLETE_IF_SERVICE_NOT_AVAIL_PROPERTY =
         "org.apache.cxf.transport.complete_if_service_not_available";
 
-    protected ConcurrentHashMap<InvocationKey, InvocationContext> inProgress
-        = new ConcurrentHashMap<InvocationKey, InvocationContext>();
+    protected ConcurrentHashMap<String, InvocationContext> inProgress = new ConcurrentHashMap<>();
     protected FailoverStrategy failoverStrategy;
     private boolean supportNotAvailableErrorsOnly = true;
     private String clientBootstrapAddress;
@@ -112,7 +111,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
                                       bindingOperationInfo,
                                       params,
                                       context);
-            inProgress.putIfAbsent(key, invocation);
+            inProgress.putIfAbsent(String.valueOf(key.hashCode()), invocation);
         }
     }
 
@@ -138,7 +137,10 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
     }
 
     protected InvocationContext getInvocationContext(InvocationKey key) {
-        return inProgress.get(key);
+        if (key != null) {
+            return inProgress.get(String.valueOf(key.hashCode()));
+        }
+        return null;
     }
 
     /**
@@ -175,7 +177,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
         }
 
         if (!failover) {
-            inProgress.remove(key);
+            inProgress.remove(String.valueOf(key.hashCode()));
             doComplete(exchange);
         }
     }
@@ -405,7 +407,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
 
                 Exchange exchange = message.getExchange();
                 InvocationKey key = new InvocationKey(exchange);
-                InvocationContext invocation = inProgress.get(key);
+                InvocationContext invocation = getInvocationContext(key);
                 if (invocation != null) {
                     overrideAddressProperty(invocation.getContext(),
                                             cond.getTarget().getAddress().getValue());
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
index 5ef66f5..76f49c9 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
@@ -107,7 +107,7 @@ public class LoadDistributorTargetSelector extends FailoverTargetSelector {
         }
         Exchange exchange = message.getExchange();
         InvocationKey key = new InvocationKey(exchange);
-        InvocationContext invocation = inProgress.get(key);
+        InvocationContext invocation = getInvocationContext(key);
         if ((invocation != null) && !invocation.getContext().containsKey(IS_DISTRIBUTED)) {
             Endpoint target = getDistributionTarget(exchange, invocation);
             if (target != null) {


[cxf] 03/03: Recording .gitmergeinfo Changes

Posted by co...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 93a673d411d4517a5ce4a17694b84c317e56b443
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Tue Oct 23 11:58:42 2018 +0100

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index 5dd08fd..eeb9d47 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -20,6 +20,7 @@ B 446d27383b01febfefd450206d52944a028e2c3e
 B 468cf8f3cfe9ef8285cd2ae66caddf666b7aaf13
 B 4b7ede2d6cda7651b686653e0645c40b6684b81f
 B 4da42032f95e667a402b113d6daf4bd0514c6d60
+B 55afdc9373257d8c111468926c7278b4e9b28a61
 B 58e72337226f4e963abfc6f1a65625d86b7003b5
 B 64dd999db1c712cd54f003a76abe1d221a1e3295
 B 68af13530379662fc99a716d635d4aab7c5fb027
@@ -38,6 +39,7 @@ B acc697f1c88f392bd952e07d50a9ae2ce0b76411
 B ae994168f50894010f1f148ec3b6f35b17e4b63b
 B b0094cc8813e56e2ec3694193df07d2d5de0ef66
 B b1b359393c986a2e8471852dd4d6e66fc37c1eac
+B b2b30af081861b2f62dd38a86babae694af27355
 B b3268466338750b742ee900bbec13e568527be2c
 B b8236c923ba409087e8db6132963924151918efc
 B b8fc2da105029c10cf24ed344198fbefdd190648
@@ -59,8 +61,10 @@ B e9d4d65a2bdc1e258d5fd1d470abbf68c1ee4d07
 B eb76ad6674586bef76a23332030a3dc310cfa235
 B f652464526edb0cb7eac277ff64c9f2d02e2b920
 B f68311f3de3e66526303b9292d07468499703eb6
+B f70768f7b7fa95305949a5b7df9bbc14bb48add3
 B fc656dc1311963f5fc833856984b36b7ccd26042
 B fe5603d060881cf801db69ade3f9790423d48094
+B fe8c6187ade3699e085623c92ddd3abcfe75a2a0
 B ffff30fdd8fc1b5987158d7d9fec8349b9d735d2
 M 05b8db050b158d82aad57081a0dbb5e7c92cc3b1
 M 0b920d7275800e651b2c48c982b1977249516ddf