You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/03/19 21:16:56 UTC
[spark] branch master updated: [SPARK-26288][CORE] add
initRegisteredExecutorsDB
This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0aa59 [SPARK-26288][CORE] add initRegisteredExecutorsDB
8b0aa59 is described below
commit 8b0aa59218c209d39cbba5959302d8668b885cf6
Author: weixiuli <we...@jd.com>
AuthorDate: Tue Mar 19 16:16:43 2019 -0500
[SPARK-26288][CORE] add initRegisteredExecutorsDB
## What changes were proposed in this pull request?
As we all know that spark on Yarn uses DB https://github.com/apache/spark/pull/7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .
The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.
To solve the problem above, a method is proposed and is committed .
## How was this patch tested?
new unit tests
Closes #23393 from weixiuli/SPARK-26288.
Authored-by: weixiuli <we...@jd.com>
Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
.../shuffle/ExternalShuffleBlockHandler.java | 5 +
core/pom.xml | 7 ++
.../spark/deploy/ExternalShuffleService.scala | 25 +++-
.../org/apache/spark/deploy/worker/Worker.scala | 9 ++
.../org/apache/spark/internal/config/package.scala | 7 ++
.../deploy/ExternalShuffleServiceDbSuite.scala | 140 +++++++++++++++++++++
.../apache/spark/deploy/worker/WorkerSuite.scala | 51 +++++++-
docs/spark-standalone.md | 11 ++
8 files changed, 253 insertions(+), 2 deletions(-)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index b25e48a..70dcc8b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -66,6 +66,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}
+ @VisibleForTesting
+ public ExternalShuffleBlockResolver getBlockResolver() {
+ return blockManager;
+ }
+
/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(
diff --git a/core/pom.xml b/core/pom.xml
index b9f78b2..45bda44 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -372,6 +372,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 12ed189..28279fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy
+import java.io.File
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
@@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
+ private val registeredExecutorsDB = "registeredExecutors.ldb"
+
private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
@@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val shuffleServiceSource = new ExternalShuffleServiceSource
+ protected def findRegisteredExecutorsDBFile(dbName: String): File = {
+ val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
+ if (localDirs.length >= 1) {
+ new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
+ } else {
+ logWarning(s"'spark.local.dir' should be set first when we use db in " +
+ s"ExternalShuffleService. Note that this only affects standalone mode.")
+ null
+ }
+ }
+
+ /** Get blockhandler */
+ def getBlockHandler: ExternalShuffleBlockHandler = {
+ blockHandler
+ }
+
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
- new ExternalShuffleBlockHandler(conf, null)
+ if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
+ new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB))
+ } else {
+ new ExternalShuffleBlockHandler(conf, null)
+ }
}
/** Starts the external shuffle service if the user has configured us to. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 52892c3..a0664b3f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -466,6 +466,15 @@ private[deploy] class Worker(
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
+
+ // Remove some registeredExecutors information of DB in external shuffle service when
+ // #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens
+ // if an application is stopped while the external shuffle service is down?
+ // So then it'll leave an entry in the DB and the entry should be removed.
+ if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
+ conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+ shuffleService.applicationRemoved(dir.getName)
+ }
}
}(cleanupThreadExecutor)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2d664bb..758f605 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -359,6 +359,13 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
+ private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
+ ConfigBuilder("spark.shuffle.service.db.enabled")
+ .doc("Whether to use db in ExternalShuffleService. Note that this only affects " +
+ "standalone mode.")
+ .booleanConf
+ .createWithDefault(true)
+
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
new file mode 100644
index 0000000..e33c3f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.CharStreams
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
+import org.apache.spark.network.shuffle.TestShuffleDataContext
+import org.apache.spark.util.Utils
+
+/**
+ * This suite gets BlockData when the ExternalShuffleService is restarted
+ * with #spark.shuffle.service.db.enabled = true or false
+ * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
+ */
+class ExternalShuffleServiceDbSuite extends SparkFunSuite {
+ val sortBlock0 = "Hello!"
+ val sortBlock1 = "World!"
+ val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
+
+ var sparkConf: SparkConf = _
+ var dataContext: TestShuffleDataContext = _
+
+ var securityManager: SecurityManager = _
+ var externalShuffleService: ExternalShuffleService = _
+ var blockHandler: ExternalShuffleBlockHandler = _
+ var blockResolver: ExternalShuffleBlockResolver = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ sparkConf = new SparkConf()
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+ sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
+ Utils.loadDefaultSparkProperties(sparkConf, null)
+ securityManager = new SecurityManager(sparkConf)
+
+ dataContext = new TestShuffleDataContext(2, 5)
+ dataContext.create()
+ // Write some sort data.
+ dataContext.insertSortShuffleData(0, 0,
+ Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
+ sortBlock1.getBytes(StandardCharsets.UTF_8)))
+ registerExecutor()
+ }
+
+ override def afterAll() {
+ try {
+ dataContext.cleanup()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ def registerExecutor(): Unit = {
+ try {
+ sparkConf.set("spark.shuffle.service.db.enabled", "true")
+ externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+
+ // external Shuffle Service start
+ externalShuffleService.start()
+ blockHandler = externalShuffleService.getBlockHandler
+ blockResolver = blockHandler.getBlockResolver
+ blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER))
+ } finally {
+ blockHandler.close()
+ // external Shuffle Service stop
+ externalShuffleService.stop()
+ }
+ }
+
+ // The beforeAll ensures the shuffle data was already written, and then
+ // the shuffle service was stopped. Here we restart the shuffle service
+ // and make we can read the shuffle data
+ test("Recover shuffle data with spark.shuffle.service.db.enabled=true after " +
+ "shuffle service restart") {
+ try {
+ sparkConf.set("spark.shuffle.service.db.enabled", "true")
+ externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+ // externalShuffleService restart
+ externalShuffleService.start()
+ blockHandler = externalShuffleService.getBlockHandler
+ blockResolver = blockHandler.getBlockResolver
+
+ val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
+ val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
+ block0Stream.close()
+ assert(sortBlock0 == block0)
+ // pass
+ } finally {
+ blockHandler.close()
+ // externalShuffleService stop
+ externalShuffleService.stop()
+ }
+
+ }
+
+ // The beforeAll ensures the shuffle data was already written, and then
+ // the shuffle service was stopped. Here we restart the shuffle service ,
+ // but we can't read the shuffle data
+ test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false after" +
+ " shuffle service restart") {
+ try {
+ sparkConf.set("spark.shuffle.service.db.enabled", "false")
+ externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+ // externalShuffleService restart
+ externalShuffleService.start()
+ blockHandler = externalShuffleService.getBlockHandler
+ blockResolver = blockHandler.getBlockResolver
+
+ val error = intercept[RuntimeException] {
+ blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
+ }.getMessage
+
+ assert(error.contains("not registered"))
+ } finally {
+ blockHandler.close()
+ // externalShuffleService stop
+ externalShuffleService.stop()
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index f6559df..168694c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.deploy.worker
+import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier
+import scala.concurrent.duration._
+
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
@@ -27,14 +30,16 @@ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
-import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
+import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.util.Utils
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
@@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}
+
+ test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
+ "spark.shuffle.service.db.enabled=true") {
+ testWorkDirCleanupAndRemoveMetadataWithConfig(true)
+ }
+
+ test("WorkdDirCleanup cleans only app dirs when" +
+ "spark.shuffle.service.db.enabled=false") {
+ testWorkDirCleanupAndRemoveMetadataWithConfig(false)
+ }
+
+ private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
+ val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
+ conf.set("spark.worker.cleanup.appDataTtl", "60")
+ conf.set("spark.shuffle.service.enabled", "true")
+
+ val appId = "app1"
+ val execId = "exec1"
+ val cleanupCalled = new AtomicBoolean(false)
+ when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] {
+ override def answer(invocations: InvocationOnMock): Unit = {
+ cleanupCalled.set(true)
+ }
+ })
+ val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
+ override def get: ExternalShuffleService = shuffleService
+ }
+ val worker = makeWorker(conf, externalShuffleServiceSupplier)
+ val workDir = Utils.createTempDir(namePrefix = "work")
+ // initialize workers
+ worker.workDir = workDir
+ // Create the executor's working directory
+ val executorDir = new File(worker.workDir, appId + "/" + execId)
+
+ if (!executorDir.exists && !executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+ executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
+ worker.receive(WorkDirCleanup)
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ assert(!executorDir.exists() == true)
+ assert(cleanupCalled.get() == dbCleanupEnabled)
+ }
+ }
}
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 60b84d3..3400da4 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -240,6 +240,7 @@ SPARK_WORKER_OPTS supports the following system properties:
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently. Only the directories of stopped applications are cleaned up.
+ This should be enabled if spark.shuffle.service.db.enabled is "true"
</td>
</tr>
<tr>
@@ -261,6 +262,16 @@ SPARK_WORKER_OPTS supports the following system properties:
</td>
</tr>
<tr>
+ <td><spark.shuffle.service.db.enabled</code></td>
+ <td>true</td>
+ <td>
+ Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
+ automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior
+ enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
+ eventually gets cleaned up. This config may be removed in the future.
+ </td>
+</tr>
+<tr>
<td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
<td>true</td>
<td>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org