You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2024/01/28 15:21:24 UTC
(cxf) branch main updated: CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/main by this push:
new 0b0b85eb2d CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)
0b0b85eb2d is described below
commit 0b0b85eb2dfae1cfa97d89993b79b0f611084bed
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sun Jan 28 10:21:18 2024 -0500
CXF-8955: Custom timeout with Message.RECEIVE_TIMEOUT not working with hc5 / hc (#1607)
---
.../http/asyncclient/AsyncHTTPConduit.java | 17 +--
.../http/asyncclient/hc5/AsyncHTTPConduit.java | 16 +--
.../hc5/http/timeout/ClientTimeoutTest.java | 127 +++++++++++++++++++++
.../systest/hc5/http/timeout/DelayedService.java | 23 ++++
.../hc5/http/timeout/DelayedServiceImpl.java | 30 +++++
.../systest/http/timeout/ClientTimeoutTest.java | 127 +++++++++++++++++++++
.../cxf/systest/http/timeout/DelayedService.java | 23 ++++
.../systest/http/timeout/DelayedServiceImpl.java | 30 +++++
8 files changed, 364 insertions(+), 29 deletions(-)
diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index 098008039b..7325611b8b 100755
--- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -140,7 +140,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
super.setupConnection(message, address, csPolicy);
return;
}
- propagateJaxwsSpecTimeoutSettings(message, csPolicy);
boolean addressChanged = false;
// need to do some clean up work on the URI address
URI uri = address.getURI();
@@ -231,8 +230,8 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
e.setEntity(entity);
RequestConfig.Builder b = RequestConfig.custom()
- .setConnectTimeout((int) csPolicy.getConnectionTimeout())
- .setSocketTimeout((int) csPolicy.getReceiveTimeout())
+ .setConnectTimeout(determineConnectionTimeout(message, csPolicy))
+ .setSocketTimeout(determineReceiveTimeout(message, csPolicy))
.setConnectionRequestTimeout((int) csPolicy.getConnectionRequestTimeout());
Proxy p = proxyFactory.createProxy(csPolicy, uri);
if (p != null && p.type() != Proxy.Type.DIRECT) {
@@ -245,18 +244,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
message.put(CXFHttpRequest.class, e);
}
- private void propagateJaxwsSpecTimeoutSettings(Message message, HTTPClientPolicy csPolicy) {
- int receiveTimeout = determineReceiveTimeout(message, csPolicy);
- if (csPolicy.getReceiveTimeout() == 60000) {
- csPolicy.setReceiveTimeout(receiveTimeout);
- }
- int connectionTimeout = determineConnectionTimeout(message, csPolicy);
- if (csPolicy.getConnectionTimeout() == 30000) {
- csPolicy.setConnectionTimeout(connectionTimeout);
- }
- }
-
-
protected OutputStream createOutputStream(Message message,
boolean needToCacheRequest,
boolean isChunking,
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
index 780a51772d..33351f17d6 100644
--- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
@@ -145,7 +145,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
return;
}
- propagateJaxwsSpecTimeoutSettings(message, csPolicy);
propagateProtocolSettings(message, csPolicy);
boolean addressChanged = false;
@@ -247,8 +246,8 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
final RequestConfig.Builder b = RequestConfig
.custom()
- .setConnectTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionTimeout()))
- .setResponseTimeout(Timeout.ofMilliseconds(csPolicy.getReceiveTimeout()))
+ .setConnectTimeout(Timeout.ofMilliseconds(determineConnectionTimeout(message, csPolicy)))
+ .setResponseTimeout(Timeout.ofMilliseconds(determineReceiveTimeout(message, csPolicy)))
.setConnectionRequestTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionRequestTimeout()));
final Proxy p = proxyFactory.createProxy(csPolicy, uri);
@@ -271,17 +270,6 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
}
}
- private void propagateJaxwsSpecTimeoutSettings(Message message, HTTPClientPolicy csPolicy) {
- int receiveTimeout = determineReceiveTimeout(message, csPolicy);
- if (csPolicy.getReceiveTimeout() == 60000) {
- csPolicy.setReceiveTimeout(receiveTimeout);
- }
- int connectionTimeout = determineConnectionTimeout(message, csPolicy);
- if (csPolicy.getConnectionTimeout() == 30000) {
- csPolicy.setConnectionTimeout(connectionTimeout);
- }
- }
-
@Override
protected OutputStream createOutputStream(Message message, boolean needToCacheRequest,
boolean isChunking, int chunkThreshold) throws IOException {
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java
new file mode 100644
index 0000000000..d37bb35bcb
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/ClientTimeoutTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+import java.net.SocketTimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.TestUtil;
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientTimeoutTest extends AbstractBusClientServerTestBase {
+ private static final String PORT = TestUtil.getNewPortNumber(ClientTimeoutTest.class);
+ private static Server server;
+
+ @BeforeClass
+ public static void setUp() {
+ final DelayedServiceImpl delayedImpl = new DelayedServiceImpl();
+ final ServerFactoryBean svrFactory = new ServerFactoryBean();
+ svrFactory.setServiceClass(DelayedService.class);
+ svrFactory.setAddress("http://localhost:" + PORT + "/Hello");
+ svrFactory.setServiceBean(delayedImpl);
+ server = svrFactory.create();
+ }
+
+ @Test
+ public void clientTimeoutWithParallelCalls() throws Exception {
+ final DelayedService client = buildClient();
+
+ // Start two soap calls in parallel, both will run into a timeout defined by CustomReadTimeoutInterceptor
+ final CompletableFuture<String> f = new CompletableFuture<>();
+ final Thread thread = new Thread(() -> {
+ try {
+ f.complete(client.delay(10 * 1000L));
+ } catch (Exception ex) {
+ f.completeExceptionally(ex);
+ }
+ });
+ thread.start();
+
+ // Wait a bit before scheduling another call
+ Thread.sleep(1000);
+
+ // Timeout for second call is 2000 millis.
+ final Fault f1 = assertThrows(Fault.class, () -> client.delay(10 * 1000L));
+ assertThat(f1.getCause(), instanceOf(SocketTimeoutException.class));
+ assertThat(f1.getCause().getMessage(), containsString("2000 MILLISECONDS"));
+
+ // Timeout for first call is 4000 millis.
+ final CompletionException f2 = assertThrows(CompletionException.class, () -> f.join());
+ assertThat(f2.getCause(), instanceOf(Fault.class));
+ assertThat(f2.getCause().getCause(), instanceOf(SocketTimeoutException.class));
+ assertThat(f2.getCause().getCause().getMessage(), containsString("4000 MILLISECONDS"));
+ }
+
+ private static DelayedService buildClient() {
+ final Bus bus = BusFactory.getThreadDefaultBus();
+ bus.setProperty(AsyncHTTPConduit.USE_ASYNC, true);
+
+ final ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+ factory.setAddress("http://localhost:" + PORT + "/Hello");
+ factory.getOutInterceptors().add(new CustomReceiveTimeoutInterceptor());
+
+ return factory.create(DelayedService.class);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ server.destroy();
+ }
+
+ /**
+ * First call: Message.RECEIVE_TIMEOUT is set to 4000 millis
+ * Second call: Message.RECEIVE_TIMEOUT is set to 2000 millis
+ * ... and so on.
+ */
+ private static final class CustomReceiveTimeoutInterceptor extends AbstractPhaseInterceptor<Message> {
+ private volatile long timeoutMillis = 4000;
+
+ private CustomReceiveTimeoutInterceptor() {
+ super(Phase.PREPARE_SEND);
+ addBefore(MessageSenderInterceptor.class.getName());
+ }
+
+ @Override
+ public void handleMessage(Message message) {
+ message.put(Message.RECEIVE_TIMEOUT, timeoutMillis);
+ timeoutMillis /= 2;
+ }
+ }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java
new file mode 100644
index 0000000000..a486d51c09
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+public interface DelayedService {
+ String delay(long delay);
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java
new file mode 100644
index 0000000000..bfcd894364
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/http/timeout/DelayedServiceImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.http.timeout;
+
+public class DelayedServiceImpl implements DelayedService {
+ public String delay(long delay) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ /* do nothing */
+ }
+ return "Replied after " + delay + "ms";
+ }
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java
new file mode 100644
index 0000000000..1444b9efbc
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/ClientTimeoutTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+import java.net.SocketTimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.TestUtil;
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class ClientTimeoutTest extends AbstractBusClientServerTestBase {
+ private static final String PORT = TestUtil.getNewPortNumber(ClientTimeoutTest.class);
+ private static Server server;
+
+ @BeforeClass
+ public static void setUp() {
+ final DelayedServiceImpl delayedImpl = new DelayedServiceImpl();
+ final ServerFactoryBean svrFactory = new ServerFactoryBean();
+ svrFactory.setServiceClass(DelayedService.class);
+ svrFactory.setAddress("http://localhost:" + PORT + "/Hello");
+ svrFactory.setServiceBean(delayedImpl);
+ server = svrFactory.create();
+ }
+
+ @Test
+ public void clientTimeoutWithParallelCalls() throws Exception {
+ final DelayedService client = buildClient();
+
+ // Start two soap calls in parallel, both will run into a timeout defined by CustomReadTimeoutInterceptor
+ final CompletableFuture<String> f = new CompletableFuture<>();
+ final Thread thread = new Thread(() -> {
+ try {
+ f.complete(client.delay(10 * 1000L));
+ } catch (Exception ex) {
+ f.completeExceptionally(ex);
+ }
+ });
+ thread.start();
+
+ // Wait a bit before scheduling another call
+ Thread.sleep(1000);
+
+ // Timeout for second call is 2000 millis.
+ final Fault f1 = assertThrows(Fault.class, () -> client.delay(10 * 1000L));
+ assertThat(f1.getCause(), instanceOf(SocketTimeoutException.class));
+ assertThat(f1.getCause().getMessage(), containsString("2,000 milliseconds timeout"));
+
+ // Timeout for first call is 4000 millis.
+ final CompletionException f2 = assertThrows(CompletionException.class, () -> f.join());
+ assertThat(f2.getCause(), instanceOf(Fault.class));
+ assertThat(f2.getCause().getCause(), instanceOf(SocketTimeoutException.class));
+ assertThat(f2.getCause().getCause().getMessage(), containsString("4,000 milliseconds timeout"));
+ }
+
+ private static DelayedService buildClient() {
+ final Bus bus = BusFactory.getThreadDefaultBus();
+ bus.setProperty(AsyncHTTPConduit.USE_ASYNC, true);
+
+ final ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+ factory.setAddress("http://localhost:" + PORT + "/Hello");
+ factory.getOutInterceptors().add(new CustomReceiveTimeoutInterceptor());
+
+ return factory.create(DelayedService.class);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ server.destroy();
+ }
+
+ /**
+ * First call: Message.RECEIVE_TIMEOUT is set to 4000 millis
+ * Second call: Message.RECEIVE_TIMEOUT is set to 2000 millis
+ * ... and so on.
+ */
+ private static final class CustomReceiveTimeoutInterceptor extends AbstractPhaseInterceptor<Message> {
+ private volatile long timeoutMillis = 4000;
+
+ private CustomReceiveTimeoutInterceptor() {
+ super(Phase.PREPARE_SEND);
+ addBefore(MessageSenderInterceptor.class.getName());
+ }
+
+ @Override
+ public void handleMessage(Message message) {
+ message.put(Message.RECEIVE_TIMEOUT, timeoutMillis);
+ timeoutMillis /= 2;
+ }
+ }
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java
new file mode 100644
index 0000000000..6a4c3a8cda
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+public interface DelayedService {
+ String delay(long delay);
+}
diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java
new file mode 100644
index 0000000000..d86e566cea
--- /dev/null
+++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/timeout/DelayedServiceImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.http.timeout;
+
+public class DelayedServiceImpl implements DelayedService {
+ public String delay(long delay) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ /* do nothing */
+ }
+ return "Replied after " + delay + "ms";
+ }
+}