You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2024/01/28 15:21:24 UTC

(cxf) branch main updated: CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b0b85eb2d CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)
0b0b85eb2d is described below

commit 0b0b85eb2dfae1cfa97d89993b79b0f611084bed
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sun Jan 28 10:21:18 2024 -0500

    CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)
---
 .../http/asyncclient/AsyncHTTPConduit.java         |  17 +--
 .../http/asyncclient/hc5/AsyncHTTPConduit.java     |  16 +--
 .../hc5/http/timeout/ClientTimeoutTest.java        | 127 +++++++++++++++++++++
 .../systest/hc5/http/timeout/DelayedService.java   |  23 ++++
 .../hc5/http/timeout/DelayedServiceImpl.java       |  30 +++++
 .../systest/http/timeout/ClientTimeoutTest.java    | 127 +++++++++++++++++++++
 .../cxf/systest/http/timeout/DelayedService.java   |  23 ++++
 .../systest/http/timeout/DelayedServiceImpl.java   |  30 +++++
 8 files changed, 364 insertions(+), 29 deletions(-)

diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index 098008039b..7325611b8b 100755
--- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -140,7 +140,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
             super.setupConnection(message, address, csPolicy);
             return;
         }
-        propagateJaxwsSpecTimeoutSettings(message, csPolicy);
         boolean addressChanged = false;
         // need to do some clean up work on the URI address
         URI uri = address.getURI();
@@ -231,8 +230,8 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
         e.setEntity(entity);
 
         RequestConfig.Builder b = RequestConfig.custom()
-                .setConnectTimeout((int) csPolicy.getConnectionTimeout())
-                .setSocketTimeout((int) csPolicy.getReceiveTimeout())
+                .setConnectTimeout(determineConnectionTimeout(message, csPolicy))
+                .setSocketTimeout(determineReceiveTimeout(message, csPolicy))
                 .setConnectionRequestTimeout((int) csPolicy.getConnectionRequestTimeout());
         Proxy p = proxyFactory.createProxy(csPolicy, uri);
         if (p != null && p.type() != Proxy.Type.DIRECT) {
@@ -245,18 +244,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
         message.put(CXFHttpRequest.class, e);
     }
 
-    private void propagateJaxwsSpecTimeoutSettings(Message message, HTTPClientPolicy csPolicy) {
-        int receiveTimeout = determineReceiveTimeout(message, csPolicy);
-        if (csPolicy.getReceiveTimeout() == 60000) {
-            csPolicy.setReceiveTimeout(receiveTimeout);
-        }
-        int connectionTimeout = determineConnectionTimeout(message, csPolicy);
-        if (csPolicy.getConnectionTimeout() == 30000) {
-            csPolicy.setConnectionTimeout(connectionTimeout);
-        }
-    }
-
-
     protected OutputStream createOutputStream(Message message,
                                               boolean needToCacheRequest,
                                               boolean isChunking,
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
index 780a51772d..33351f17d6 100644
--- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
@@ -145,7 +145,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
             return;
         }
 
-        propagateJaxwsSpecTimeoutSettings(message, csPolicy);
         propagateProtocolSettings(message, csPolicy);
 
         boolean addressChanged = false;
@@ -247,8 +246,8 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
 
         final RequestConfig.Builder b = RequestConfig
             .custom()
-            .setConnectTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionTimeout()))
-            .setResponseTimeout(Timeout.ofMilliseconds(csPolicy.getReceiveTimeout()))
+            .setConnectTimeout(Timeout.ofMilliseconds(determineConnectionTimeout(message, csPolicy)))
+            .setResponseTimeout(Timeout.ofMilliseconds(determineReceiveTimeout(message, csPolicy)))
             .setConnectionRequestTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionRequestTimeout()));
         
         final Proxy p = proxyFactory.createProxy(csPolicy, uri);
@@ -271,17 +270,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
         }
     }
 
-    private void propagateJaxwsSpecTimeoutSettings(Message message, HTTPClientPolicy csPolicy) {
-        int receiveTimeout = determineReceiveTimeout(message, csPolicy);
-        if (csPolicy.getReceiveTimeout() == 60000) {
-            csPolicy.setReceiveTimeout(receiveTimeout);
-        }
-        int connectionTimeout = determineConnectionTimeout(message, csPolicy);
-        if (csPolicy.getConnectionTimeout() == 30000) {
-            csPolicy.setConnectionTimeout(connectionTimeout);
-        }
-    }
-
     @Override
     protected OutputStream createOutputStream(Message message, boolean needToCacheRequest,
             boolean isChunking, int chunkThreshold) throws IOException {
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java
new file mode 100644
index 0000000000..d37bb35bcb
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+import java.net.SocketTimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.TestUtil;
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientTimeoutTest extends AbstractBusClientServerTestBase {
+    private static final String PORT = TestUtil.getNewPortNumber(ClientTimeoutTest.class);
+    private static Server server;
+
+    @BeforeClass
+    public static void setUp() {
+        final DelayedServiceImpl delayedImpl = new DelayedServiceImpl();
+        final ServerFactoryBean svrFactory = new ServerFactoryBean();
+        svrFactory.setServiceClass(DelayedService.class);
+        svrFactory.setAddress("http://localhost:" + PORT + "/Hello");
+        svrFactory.setServiceBean(delayedImpl);
+        server = svrFactory.create();
+    }
+
+    @Test
+    public void clientTimeoutWithParallelCalls() throws Exception {
+        final DelayedService client = buildClient();
+
+        // Start two soap calls in parallel, both will run into a timeout defined by CustomReadTimeoutInterceptor
+        final CompletableFuture<String> f = new CompletableFuture<>();
+        final Thread thread = new Thread(() -> {
+            try {
+                f.complete(client.delay(10 * 1000L));
+            } catch (Exception ex) {
+                f.completeExceptionally(ex);
+            }
+        });
+        thread.start();
+
+        // Wait a bit before scheduling another call
+        Thread.sleep(1000);
+
+        // Timeout for second call is 2000 millis.
+        final Fault f1 = assertThrows(Fault.class, () -> client.delay(10 * 1000L));
+        assertThat(f1.getCause(), instanceOf(SocketTimeoutException.class));
+        assertThat(f1.getCause().getMessage(), containsString("2000 MILLISECONDS"));
+
+        // Timeout for first call is 4000 millis.
+        final CompletionException f2 = assertThrows(CompletionException.class, () -> f.join());
+        assertThat(f2.getCause(), instanceOf(Fault.class));
+        assertThat(f2.getCause().getCause(), instanceOf(SocketTimeoutException.class));
+        assertThat(f2.getCause().getCause().getMessage(), containsString("4000 MILLISECONDS"));
+    }
+
+    private static DelayedService buildClient() {
+        final Bus bus = BusFactory.getThreadDefaultBus();
+        bus.setProperty(AsyncHTTPConduit.USE_ASYNC, true);
+
+        final ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+        factory.setAddress("http://localhost:" + PORT + "/Hello");
+        factory.getOutInterceptors().add(new CustomReceiveTimeoutInterceptor());
+
+        return factory.create(DelayedService.class);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        server.destroy();
+    }
+    
+    /**
+     * First call: Message.RECEIVE_TIMEOUT is set to 4000 millis
+     * Second call: Message.RECEIVE_TIMEOUT is set to 2000 millis
+     * ... and so on.
+     */
+    private static final class CustomReceiveTimeoutInterceptor extends AbstractPhaseInterceptor<Message> {
+        private volatile long timeoutMillis = 4000;
+
+        private CustomReceiveTimeoutInterceptor() {
+            super(Phase.PREPARE_SEND);
+            addBefore(MessageSenderInterceptor.class.getName());
+        }
+
+        @Override
+        public void handleMessage(Message message) {
+            message.put(Message.RECEIVE_TIMEOUT, timeoutMillis);
+            timeoutMillis /= 2;
+        }
+    }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java
new file mode 100644
index 0000000000..a486d51c09
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+public interface DelayedService {
+    String delay(long delay);
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java
new file mode 100644
index 0000000000..bfcd894364
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+public class DelayedServiceImpl implements DelayedService {
+    public String delay(long delay) {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException ex) {
+            /* do nothing */
+        }
+        return "Replied after " + delay + "ms";
+    }
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java
new file mode 100644
index 0000000000..1444b9efbc
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+import java.net.SocketTimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.TestUtil;
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientTimeoutTest extends AbstractBusClientServerTestBase {
+    private static final String PORT = TestUtil.getNewPortNumber(ClientTimeoutTest.class);
+    private static Server server;
+
+    @BeforeClass
+    public static void setUp() {
+        final DelayedServiceImpl delayedImpl = new DelayedServiceImpl();
+        final ServerFactoryBean svrFactory = new ServerFactoryBean();
+        svrFactory.setServiceClass(DelayedService.class);
+        svrFactory.setAddress("http://localhost:" + PORT + "/Hello");
+        svrFactory.setServiceBean(delayedImpl);
+        server = svrFactory.create();
+    }
+
+    @Test
+    public void clientTimeoutWithParallelCalls() throws Exception {
+        final DelayedService client = buildClient();
+
+        // Start two soap calls in parallel, both will run into a timeout defined by CustomReadTimeoutInterceptor
+        final CompletableFuture<String> f = new CompletableFuture<>();
+        final Thread thread = new Thread(() -> {
+            try {
+                f.complete(client.delay(10 * 1000L));
+            } catch (Exception ex) {
+                f.completeExceptionally(ex);
+            }
+        });
+        thread.start();
+
+        // Wait a bit before scheduling another call
+        Thread.sleep(1000);
+
+        // Timeout for second call is 2000 millis.
+        final Fault f1 = assertThrows(Fault.class, () -> client.delay(10 * 1000L));
+        assertThat(f1.getCause(), instanceOf(SocketTimeoutException.class));
+        assertThat(f1.getCause().getMessage(), containsString("2,000 milliseconds timeout"));
+
+        // Timeout for first call is 4000 millis.
+        final CompletionException f2 = assertThrows(CompletionException.class, () -> f.join());
+        assertThat(f2.getCause(), instanceOf(Fault.class));
+        assertThat(f2.getCause().getCause(), instanceOf(SocketTimeoutException.class));
+        assertThat(f2.getCause().getCause().getMessage(), containsString("4,000 milliseconds timeout"));
+    }
+
+    private static DelayedService buildClient() {
+        final Bus bus = BusFactory.getThreadDefaultBus();
+        bus.setProperty(AsyncHTTPConduit.USE_ASYNC, true);
+
+        final ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+        factory.setAddress("http://localhost:" + PORT + "/Hello");
+        factory.getOutInterceptors().add(new CustomReceiveTimeoutInterceptor());
+
+        return factory.create(DelayedService.class);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        server.destroy();
+    }
+    
+    /**
+     * First call: Message.RECEIVE_TIMEOUT is set to 4000 millis
+     * Second call: Message.RECEIVE_TIMEOUT is set to 2000 millis
+     * ... and so on.
+     */
+    private static final class CustomReceiveTimeoutInterceptor extends AbstractPhaseInterceptor<Message> {
+        private volatile long timeoutMillis = 4000;
+
+        private CustomReceiveTimeoutInterceptor() {
+            super(Phase.PREPARE_SEND);
+            addBefore(MessageSenderInterceptor.class.getName());
+        }
+
+        @Override
+        public void handleMessage(Message message) {
+            message.put(Message.RECEIVE_TIMEOUT, timeoutMillis);
+            timeoutMillis /= 2;
+        }
+    }
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java
new file mode 100644
index 0000000000..6a4c3a8cda
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+public interface DelayedService {
+    String delay(long delay);
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java
new file mode 100644
index 0000000000..d86e566cea
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+public class DelayedServiceImpl implements DelayedService {
+    public String delay(long delay) {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException ex) {
+            /* do nothing */
+        }
+        return "Replied after " + delay + "ms";
+    }
+}