You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/21 13:54:35 UTC

spark git commit: [SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable.

Repository: spark
Updated Branches:
  refs/heads/master 9ce714dca -> d107b3b91


[SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable.

## What changes were proposed in this pull request?

Currently the shuffle service registration timeout and retry has been hardcoded. This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable.

## How was this patch tested?

* Updated `BlockManagerSuite` to test registration timeout and max attempts configuration actually works.

cc sitalkedia

Author: Li Yichao <ly...@zhihu.com>

Closes #18092 from liyichao/SPARK-20640.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d107b3b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d107b3b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d107b3b9

Branch: refs/heads/master
Commit: d107b3b910d8f434fb15b663a9db4c2dfe0a9f43
Parents: 9ce714d
Author: Li Yichao <ly...@zhihu.com>
Authored: Wed Jun 21 21:54:29 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jun 21 21:54:29 2017 +0800

----------------------------------------------------------------------
 .../network/shuffle/ExternalShuffleClient.java  |  7 +-
 .../mesos/MesosExternalShuffleClient.java       |  5 +-
 .../ExternalShuffleIntegrationSuite.java        |  4 +-
 .../shuffle/ExternalShuffleSecuritySuite.java   |  2 +-
 .../apache/spark/internal/config/package.scala  | 13 ++++
 .../org/apache/spark/storage/BlockManager.scala |  7 +-
 .../spark/storage/BlockManagerSuite.scala       | 68 ++++++++++++++++++--
 docs/configuration.md                           | 14 ++++
 .../MesosCoarseGrainedSchedulerBackend.scala    |  4 +-
 9 files changed, 109 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 269fa72..6ac9302 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -49,6 +49,7 @@ public class ExternalShuffleClient extends ShuffleClient {
   private final TransportConf conf;
   private final boolean authEnabled;
   private final SecretKeyHolder secretKeyHolder;
+  private final long registrationTimeoutMs;
 
   protected TransportClientFactory clientFactory;
   protected String appId;
@@ -60,10 +61,12 @@ public class ExternalShuffleClient extends ShuffleClient {
   public ExternalShuffleClient(
       TransportConf conf,
       SecretKeyHolder secretKeyHolder,
-      boolean authEnabled) {
+      boolean authEnabled,
+      long registrationTimeoutMs) {
     this.conf = conf;
     this.secretKeyHolder = secretKeyHolder;
     this.authEnabled = authEnabled;
+    this.registrationTimeoutMs = registrationTimeoutMs;
   }
 
   protected void checkInit() {
@@ -132,7 +135,7 @@ public class ExternalShuffleClient extends ShuffleClient {
     checkInit();
     try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
       ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
-      client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+      client.sendRpcSync(registerMessage, registrationTimeoutMs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index dbc1010..60179f1 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -60,8 +60,9 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient {
   public MesosExternalShuffleClient(
       TransportConf conf,
       SecretKeyHolder secretKeyHolder,
-      boolean authEnabled) {
-    super(conf, secretKeyHolder, authEnabled);
+      boolean authEnabled,
+      long registrationTimeoutMs) {
+    super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
   }
 
   public void registerDriverWithShuffleService(

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 4391e30..a6a1b8d 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -133,7 +133,7 @@ public class ExternalShuffleIntegrationSuite {
 
     final Semaphore requestsRemaining = new Semaphore(0);
 
-    ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false);
+    ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
     client.init(APP_ID);
     client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
       new BlockFetchingListener() {
@@ -242,7 +242,7 @@ public class ExternalShuffleIntegrationSuite {
 
   private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
       throws IOException, InterruptedException {
-    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, 5000);
     client.init(APP_ID);
     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
       executorId, executorInfo);

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index bf20c57..16bad9f 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -97,7 +97,7 @@ public class ExternalShuffleSecuritySuite {
     }
 
     ExternalShuffleClient client =
-      new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true);
+      new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
     client.init(appId);
     // Registration either succeeds or throws an exception.
     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 84ef57f..615497d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -303,6 +303,19 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_REGISTRATION_TIMEOUT =
+    ConfigBuilder("spark.shuffle.registration.timeout")
+      .doc("Timeout in milliseconds for registration to the external shuffle service.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(5000)
+
+  private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS =
+    ConfigBuilder("spark.shuffle.registration.maxAttempts")
+      .doc("When we fail to register to the external shuffle service, we will " +
+        "retry for maxAttempts times.")
+      .intConf
+      .createWithDefault(3)
+
   private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
     ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
       .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1689baa..74be703 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -31,7 +31,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
@@ -174,7 +174,8 @@ private[spark] class BlockManager(
   // standard BlockTransferService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
     val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
-    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
+    new ExternalShuffleClient(transConf, securityManager,
+      securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
   } else {
     blockTransferService
   }
@@ -254,7 +255,7 @@ private[spark] class BlockManager(
       diskBlockManager.subDirsPerLocalDir,
       shuffleManager.getClass.getName)
 
-    val MAX_ATTEMPTS = 3
+    val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
     val SLEEP_TIME_SECS = 5
 
     for (i <- 1 to MAX_ATTEMPTS) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9d52b48..88f1829 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -20,13 +20,15 @@ package org.apache.spark.storage
 import java.io.File
 import java.nio.ByteBuffer
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.concurrent.Future
-import scala.language.implicitConversions
-import scala.language.postfixOps
+import scala.language.{implicitConversions, postfixOps}
 import scala.reflect.ClassTag
 
+import org.apache.commons.lang3.RandomUtils
 import org.mockito.{Matchers => mc}
 import org.mockito.Mockito.{mock, times, verify, when}
 import org.scalatest._
@@ -38,10 +40,13 @@ import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.UnifiedMemoryManager
-import org.apache.spark.network.{BlockDataManager, BlockTransferService}
+import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.netty.NettyBlockTransferService
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
+import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
 import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
@@ -1281,6 +1286,61 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("item").isEmpty)
   }
 
+  test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
+    val tryAgainMsg = "test_spark_20640_try_again"
+    // a server which delays response 50ms and must try twice for success.
+    def newShuffleServer(port: Int): (TransportServer, Int) = {
+      val attempts = new mutable.HashMap[String, Int]()
+      val handler = new NoOpRpcHandler {
+        override def receive(
+            client: TransportClient,
+            message: ByteBuffer,
+            callback: RpcResponseCallback): Unit = {
+          val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+          msgObj match {
+            case exec: RegisterExecutor =>
+              Thread.sleep(50)
+              val attempt = attempts.getOrElse(exec.execId, 0) + 1
+              attempts(exec.execId) = attempt
+              if (attempt < 2) {
+                callback.onFailure(new Exception(tryAgainMsg))
+                return
+              }
+              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+          }
+        }
+      }
+
+      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
+      val transCtx = new TransportContext(transConf, handler, true)
+      (transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port)
+    }
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
+      newShuffleServer, conf, "ShuffleServer")
+
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.port", shufflePort.toString)
+    conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
+    conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+    var e = intercept[SparkException]{
+      makeBlockManager(8000, "executor1")
+    }.getMessage
+    assert(e.contains("TimeoutException"))
+
+    conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
+    conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+    e = intercept[SparkException]{
+      makeBlockManager(8000, "executor2")
+    }.getMessage
+    assert(e.contains(tryAgainMsg))
+
+    conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
+    conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2")
+    makeBlockManager(8000, "executor3")
+    server.close()
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
     var numCalls = 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index c146474..f1c6d04 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -639,6 +639,20 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.shuffle.registration.timeout</code></td>
+  <td>5000</td>
+  <td>
+    Timeout in milliseconds for registration to the external shuffle service.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.registration.maxAttempts</code></td>
+  <td>3</td>
+  <td>
+    When we fail to register to the external shuffle service, we will retry for maxAttempts times.
+  </td>
+</tr>
+<tr>
   <td><code>spark.io.encryption.enabled</code></td>
   <td>false</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/d107b3b9/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 871685c..7dd42c4 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.SchedulerDriver
 
 import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
+import org.apache.spark.internal.config
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointAddress
@@ -150,7 +151,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     new MesosExternalShuffleClient(
       SparkTransportConf.fromSparkConf(conf, "shuffle"),
       securityManager,
-      securityManager.isAuthenticationEnabled())
+      securityManager.isAuthenticationEnabled(),
+      conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
   }
 
   private var nextMesosTaskId = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org