You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2023/06/30 10:33:47 UTC
[spark] branch branch-3.4 updated: [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new fe971de447d [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
fe971de447d is described below
commit fe971de447d55bf357c161a9f1930e822e38fa45
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jun 30 18:33:16 2023 +0800
[SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
### What changes were proposed in this pull request?
This PR makes zero when io.connectionTimeout/connectionCreationTimeout is negative. Zero here means
- connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for connection establishment
- connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` is disabled.
### Why are the changes needed?
1. This PR fixes a bug when connectionCreationTimeout is 0, which means unlimited to netty, but ChannelFuture.await(0) fails directly and inappropriately.
2. This PR fixes a bug when connectionCreationTimeout is less than 0, which causes meaningless transport client reconnections and endless executor reconstructions
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new unit tests
Closes #41785 from yaooqinn/SPARK-44241.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
(cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd)
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../network/client/TransportClientFactory.java | 16 +++++++++--
.../apache/spark/network/util/TransportConf.java | 4 +--
.../client/TransportClientFactorySuite.java | 33 +++++++++++++++++++---
3 files changed, 44 insertions(+), 9 deletions(-)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 6fb9923cd3d..3df72e65c2a 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -245,12 +245,13 @@ public class TransportClientFactory implements Closeable {
logger.debug("Creating new connection to {}", address);
Bootstrap bootstrap = new Bootstrap();
+ int connCreateTimeout = conf.connectionCreationTimeoutMs();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
.option(ChannelOption.ALLOCATOR, pooledAllocator);
if (conf.receiveBuf() > 0) {
@@ -276,10 +277,19 @@ public class TransportClientFactory implements Closeable {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
- if (!cf.await(conf.connectionCreationTimeoutMs())) {
+
+ if (connCreateTimeout <= 0) {
+ cf.awaitUninterruptibly();
+ assert cf.isDone();
+ if (cf.isCancelled()) {
+ throw new IOException(String.format("Connecting to %s cancelled", address));
+ } else if (!cf.isSuccess()) {
+ throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
+ }
+ } else if (!cf.await(connCreateTimeout)) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)",
- address, conf.connectionCreationTimeoutMs()));
+ address, connCreateTimeout));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index bbfb99168da..deac78ffedd 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -103,7 +103,7 @@ public class TransportConf {
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
- return (int) defaultTimeoutMs;
+ return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}
/** Connect creation timeout in milliseconds. Default 30 secs. */
@@ -111,7 +111,7 @@ public class TransportConf {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
- return (int) defaultTimeoutMs;
+ return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}
/** Number of concurrent connections between two nodes for fetching data. */
diff --git a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
index 4ee9a6ed10b..47b571af83d 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
@@ -31,10 +31,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
@@ -45,6 +41,8 @@ import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
+import static org.junit.Assert.*;
+
public class TransportClientFactorySuite {
private TransportConf conf;
private TransportContext context;
@@ -237,4 +235,31 @@ public class TransportClientFactorySuite {
Assert.assertThrows("fail this connection directly", IOException.class,
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
}
+
+ @Test
+ public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("spark.shuffle.io.connectionTimeout", "-1");
+ configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
+ TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
+ RpcHandler rpcHandler = new NoOpRpcHandler();
+ try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
+ TransportClientFactory factory = ctx.createClientFactory()){
+ TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
+ assertTrue(c1.isActive());
+ long expiredTime = System.currentTimeMillis() + 5000;
+ while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
+ Thread.sleep(10);
+ }
+ assertTrue(c1.isActive());
+ // When connectionCreationTimeout is unlimited, the connection shall be able to
+ // fail when the server is not reachable.
+ TransportServer server = ctx.createServer();
+ int unreachablePort = server.getPort();
+ JavaUtils.closeQuietly(server);
+ IOException exception = Assert.assertThrows(IOException.class,
+ () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
+ assertNotEquals(exception.getCause(), null);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org