You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2023/06/30 10:33:47 UTC

[spark] branch branch-3.4 updated: [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new fe971de447d [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
fe971de447d is described below

commit fe971de447d55bf357c161a9f1930e822e38fa45
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jun 30 18:33:16 2023 +0800

    [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
    
    ### What changes were proposed in this pull request?
    
    This PR makes zero when io.connectionTimeout/connectionCreationTimeout is negative. Zero here means
    - connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for connection establishment
    - connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` is disabled.
    
    ### Why are the changes needed?
    
    1. This PR fixes a bug when connectionCreationTimeout is 0, which means unlimited to netty, but ChannelFuture.await(0) fails directly and inappropriately.
    2. This PR fixes a bug when connectionCreationTimeout is less than 0, which causes meaningless transport client reconnections and endless executor reconstructions
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new unit tests
    
    Closes #41785 from yaooqinn/SPARK-44241.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
    (cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd)
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../network/client/TransportClientFactory.java     | 16 +++++++++--
 .../apache/spark/network/util/TransportConf.java   |  4 +--
 .../client/TransportClientFactorySuite.java        | 33 +++++++++++++++++++---
 3 files changed, 44 insertions(+), 9 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 6fb9923cd3d..3df72e65c2a 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -245,12 +245,13 @@ public class TransportClientFactory implements Closeable {
     logger.debug("Creating new connection to {}", address);
 
     Bootstrap bootstrap = new Bootstrap();
+    int connCreateTimeout = conf.connectionCreationTimeoutMs();
     bootstrap.group(workerGroup)
       .channel(socketChannelClass)
       // Disable Nagle's Algorithm since we don't want packets to wait
       .option(ChannelOption.TCP_NODELAY, true)
       .option(ChannelOption.SO_KEEPALIVE, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
       .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
     if (conf.receiveBuf() > 0) {
@@ -276,10 +277,19 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     long preConnect = System.nanoTime();
     ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.await(conf.connectionCreationTimeoutMs())) {
+
+    if (connCreateTimeout <= 0) {
+      cf.awaitUninterruptibly();
+      assert cf.isDone();
+      if (cf.isCancelled()) {
+        throw new IOException(String.format("Connecting to %s cancelled", address));
+      } else if (!cf.isSuccess()) {
+        throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
+      }
+    } else if (!cf.await(connCreateTimeout)) {
       throw new IOException(
         String.format("Connecting to %s timed out (%s ms)",
-          address, conf.connectionCreationTimeoutMs()));
+          address, connCreateTimeout));
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
     }
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index bbfb99168da..deac78ffedd 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -103,7 +103,7 @@ public class TransportConf {
       conf.get("spark.network.timeout", "120s"));
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
       conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
-    return (int) defaultTimeoutMs;
+    return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
   }
 
   /** Connect creation timeout in milliseconds. Default 30 secs. */
@@ -111,7 +111,7 @@ public class TransportConf {
     long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
       conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY,  connectionTimeoutS + "s")) * 1000;
-    return (int) defaultTimeoutMs;
+    return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
   }
 
   /** Number of concurrent connections between two nodes for fetching data. */
diff --git a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
index 4ee9a6ed10b..47b571af83d 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
@@ -31,10 +31,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.spark.network.TestUtils;
 import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.server.NoOpRpcHandler;
@@ -45,6 +41,8 @@ import org.apache.spark.network.util.MapConfigProvider;
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
 
+import static org.junit.Assert.*;
+
 public class TransportClientFactorySuite {
   private TransportConf conf;
   private TransportContext context;
@@ -237,4 +235,31 @@ public class TransportClientFactorySuite {
     Assert.assertThrows("fail this connection directly", IOException.class,
       () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
   }
+
+  @Test
+  public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("spark.shuffle.io.connectionTimeout", "-1");
+    configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
+    TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
+    RpcHandler rpcHandler = new NoOpRpcHandler();
+    try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
+      TransportClientFactory factory = ctx.createClientFactory()){
+      TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
+      assertTrue(c1.isActive());
+      long expiredTime = System.currentTimeMillis() + 5000;
+      while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
+        Thread.sleep(10);
+      }
+      assertTrue(c1.isActive());
+      // When connectionCreationTimeout is unlimited, the connection shall be able to
+      // fail when the server is not reachable.
+      TransportServer server = ctx.createServer();
+      int unreachablePort = server.getPort();
+      JavaUtils.closeQuietly(server);
+      IOException exception = Assert.assertThrows(IOException.class,
+          () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
+      assertNotEquals(exception.getCause(), null);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org