You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/08/31 01:26:34 UTC

spark git commit: [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled

Repository: spark
Updated Branches:
  refs/heads/master cd5d0f337 -> 4482ff23a


[SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled

In the current code, if NM recovery is not enabled then `YarnShuffleService` will write shuffle metadata to NM local dir-1, if this local dir-1 is on bad disk, then `YarnShuffleService` will be failed to start. So to solve this issue, in Spark side if NM recovery is not enabled, then Spark will not persist data into leveldb, in that case yarn shuffle service can still be served but lose the ability for recovery, (it is fine because the failure of NM will kill the containers as well as applications).

Tested in the local cluster with NM recovery off and on to see if folder is created or not. MiniCluster UT isn't added because in MiniCluster NM will always set port to 0, but NM recovery requires non-ephemeral port.

Author: jerryshao <ss...@hortonworks.com>

Closes #19032 from jerryshao/SPARK-17321.

Change-Id: I8f2fe73d175e2ad2c4e380caede3873e0192d027


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4482ff23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4482ff23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4482ff23

Branch: refs/heads/master
Commit: 4482ff23ad984335b0d477100ac0815d5db8d532
Parents: cd5d0f3
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Aug 31 09:26:20 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Aug 31 09:26:20 2017 +0800

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  | 82 ++++++++++----------
 .../yarn/YarnShuffleIntegrationSuite.scala      | 33 ++++++--
 .../network/yarn/YarnShuffleServiceSuite.scala  | 32 +++++---
 3 files changed, 86 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4482ff23/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index cd67eb2..d8b2ed6 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -160,7 +161,9 @@ public class YarnShuffleService extends AuxiliaryService {
       // If we don't find one, then we choose a file to use to save the state next time.  Even if
       // an application was stopped while the NM was down, we expect yarn to call stopApplication()
       // when it comes back
-      registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+      if (_recoveryPath != null) {
+        registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+      }
 
       TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
       blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
@@ -170,7 +173,10 @@ public class YarnShuffleService extends AuxiliaryService {
       List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
       boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
       if (authEnabled) {
-        createSecretManager();
+        secretManager = new ShuffleSecretManager();
+        if (_recoveryPath != null) {
+          loadSecretsFromDb();
+        }
         bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
       }
 
@@ -194,13 +200,12 @@ public class YarnShuffleService extends AuxiliaryService {
     }
   }
 
-  private void createSecretManager() throws IOException {
-    secretManager = new ShuffleSecretManager();
+  private void loadSecretsFromDb() throws IOException {
     secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 
     // Make sure this is protected in case its not in the NM recovery dir
     FileSystem fs = FileSystem.getLocal(_conf);
-    fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
+    fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
 
     db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
     logger.info("Recovery location is: " + secretsFile.getPath());
@@ -317,10 +322,10 @@ public class YarnShuffleService extends AuxiliaryService {
   }
 
   /**
-   * Set the recovery path for shuffle service recovery when NM is restarted. The method will be
-   * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
-   * have to manually call this to set our own recovery path.
+   * Set the recovery path for shuffle service recovery when NM is restarted. This will be call
+   * by NM if NM recovery is enabled.
    */
+  @Override
   public void setRecoveryPath(Path recoveryPath) {
     _recoveryPath = recoveryPath;
   }
@@ -334,53 +339,44 @@ public class YarnShuffleService extends AuxiliaryService {
 
   /**
    * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
-   * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
-   * it will uses a YARN local dir.
+   * and DB exists in the local dir of NM by old version of shuffle service.
    */
   protected File initRecoveryDb(String dbName) {
-    if (_recoveryPath != null) {
-        File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
-        if (recoveryFile.exists()) {
-          return recoveryFile;
-        }
+    Preconditions.checkNotNull(_recoveryPath,
+      "recovery path should not be null if NM recovery is enabled");
+
+    File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
+    if (recoveryFile.exists()) {
+      return recoveryFile;
     }
+
     // db doesn't exist in recovery path go check local dirs for it
     String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
     for (String dir : localDirs) {
       File f = new File(new Path(dir).toUri().getPath(), dbName);
       if (f.exists()) {
-        if (_recoveryPath == null) {
-          // If NM recovery is not enabled, we should specify the recovery path using NM local
-          // dirs, which is compatible with the old code.
-          _recoveryPath = new Path(dir);
-          return f;
-        } else {
-          // If the recovery path is set then either NM recovery is enabled or another recovery
-          // DB has been initialized. If NM recovery is enabled and had set the recovery path
-          // make sure to move all DBs to the recovery path from the old NM local dirs.
-          // If another DB was initialized first just make sure all the DBs are in the same
-          // location.
-          Path newLoc = new Path(_recoveryPath, dbName);
-          Path copyFrom = new Path(f.toURI());
-          if (!newLoc.equals(copyFrom)) {
-            logger.info("Moving " + copyFrom + " to: " + newLoc);
-            try {
-              // The move here needs to handle moving non-empty directories across NFS mounts
-              FileSystem fs = FileSystem.getLocal(_conf);
-              fs.rename(copyFrom, newLoc);
-            } catch (Exception e) {
-              // Fail to move recovery file to new path, just continue on with new DB location
-              logger.error("Failed to move recovery file {} to the path {}",
-                dbName, _recoveryPath.toString(), e);
-            }
+        // If the recovery path is set then either NM recovery is enabled or another recovery
+        // DB has been initialized. If NM recovery is enabled and had set the recovery path
+        // make sure to move all DBs to the recovery path from the old NM local dirs.
+        // If another DB was initialized first just make sure all the DBs are in the same
+        // location.
+        Path newLoc = new Path(_recoveryPath, dbName);
+        Path copyFrom = new Path(f.toURI());
+        if (!newLoc.equals(copyFrom)) {
+          logger.info("Moving " + copyFrom + " to: " + newLoc);
+          try {
+            // The move here needs to handle moving non-empty directories across NFS mounts
+            FileSystem fs = FileSystem.getLocal(_conf);
+            fs.rename(copyFrom, newLoc);
+          } catch (Exception e) {
+            // Fail to move recovery file to new path, just continue on with new DB location
+            logger.error("Failed to move recovery file {} to the path {}",
+              dbName, _recoveryPath.toString(), e);
           }
-          return new File(newLoc.toUri().getPath());
         }
+        return new File(newLoc.toUri().getPath());
       }
     }
-    if (_recoveryPath == null) {
-      _recoveryPath = new Path(localDirs[0]);
-    }
 
     return new File(_recoveryPath.toUri().getPath(), dbName);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4482ff23/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 13472f2..01db796 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -70,11 +70,18 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
     val finalState = runSpark(
       false,
       mainClassName(YarnExternalShuffleDriver.getClass),
-      appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
+      appArgs = if (registeredExecFile != null) {
+        Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
+      } else {
+        Seq(result.getAbsolutePath)
+      },
       extraConf = extraSparkConf()
     )
     checkResult(finalState, result)
-    assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
+
+    if (registeredExecFile != null) {
+      assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
+    }
   }
 }
 
@@ -105,7 +112,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
   val WAIT_TIMEOUT_MILLIS = 10000
 
   def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
+    if (args.length > 2) {
       // scalastyle:off println
       System.err.println(
         s"""
@@ -121,10 +128,16 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
       .setAppName("External Shuffle Test"))
     val conf = sc.getConf
     val status = new File(args(0))
-    val registeredExecFile = new File(args(1))
+    val registeredExecFile = if (args.length == 2) {
+      new File(args(1))
+    } else {
+      null
+    }
     logInfo("shuffle service executor file = " + registeredExecFile)
     var result = "failure"
-    val execStateCopy = new File(registeredExecFile.getAbsolutePath + "_dup")
+    val execStateCopy = Option(registeredExecFile).map { file =>
+      new File(file.getAbsolutePath + "_dup")
+    }.orNull
     try {
       val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }.
         collect().toSet
@@ -132,11 +145,15 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
       data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet)
       result = "success"
       // only one process can open a leveldb file at a time, so we copy the files
-      FileUtils.copyDirectory(registeredExecFile, execStateCopy)
-      assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty)
+      if (registeredExecFile != null && execStateCopy != null) {
+        FileUtils.copyDirectory(registeredExecFile, execStateCopy)
+        assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty)
+      }
     } finally {
       sc.stop()
-      FileUtils.deleteDirectory(execStateCopy)
+      if (execStateCopy != null) {
+        FileUtils.deleteDirectory(execStateCopy)
+      }
       Files.write(result, status, StandardCharsets.UTF_8)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4482ff23/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index a58784f..268f4bd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -44,6 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
   private[yarn] var yarnConfig: YarnConfiguration = null
   private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
 
+  private var recoveryLocalDir: File = _
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     yarnConfig = new YarnConfiguration()
@@ -54,6 +56,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
     val localDir = Utils.createTempDir()
     yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
+
+    recoveryLocalDir = Utils.createTempDir()
   }
 
   var s1: YarnShuffleService = null
@@ -81,6 +85,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
   test("executor state kept across NM restart") {
     s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     // set auth to true to test the secrets recovery
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1.init(yarnConfig)
@@ -123,6 +128,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     // now we pretend the shuffle service goes down, and comes back up
     s1.stop()
     s2 = new YarnShuffleService
+    s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     s2.init(yarnConfig)
     s2.secretsFile should be (secretsFile)
     s2.registeredExecutorFile should be (execStateFile)
@@ -140,6 +146,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     // Act like the NM restarts one more time
     s2.stop()
     s3 = new YarnShuffleService
+    s3.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     s3.init(yarnConfig)
     s3.registeredExecutorFile should be (execStateFile)
     s3.secretsFile should be (secretsFile)
@@ -156,6 +163,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
   test("removed applications should not be in registered executor file") {
     s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
     s1.init(yarnConfig)
     val secretsFile = s1.secretsFile
@@ -190,6 +198,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
   test("shuffle service should be robust to corrupt registered executor file") {
     s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     s1.init(yarnConfig)
     val app1Id = ApplicationId.newInstance(0, 1)
     val app1Data = makeAppInfo("user", app1Id)
@@ -215,6 +224,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     out.close()
 
     s2 = new YarnShuffleService
+    s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     s2.init(yarnConfig)
     s2.registeredExecutorFile should be (execStateFile)
 
@@ -234,6 +244,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
     // another stop & restart should be fine though (eg., we recover from previous corruption)
     s3 = new YarnShuffleService
+    s3.setRecoveryPath(new Path(recoveryLocalDir.toURI))
     s3.init(yarnConfig)
     s3.registeredExecutorFile should be (execStateFile)
     val handler3 = s3.blockHandler
@@ -254,14 +265,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     s1.init(yarnConfig)
     s1._recoveryPath should be (recoveryPath)
     s1.stop()
-
-    // Test recovery path is set inside the shuffle service, this will be happened when NM
-    // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
-    s2 = new YarnShuffleService
-    s2.init(yarnConfig)
-    s2._recoveryPath should be
-      (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
-    s2.stop()
   }
 
   test("moving recovery file from NM local dir to recovery path") {
@@ -271,6 +274,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
     // dir.
     s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)(0)))
     // set auth to true to test the secrets recovery
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1.init(yarnConfig)
@@ -308,7 +312,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
     // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
     assert(execStateFile.exists())
-    val recoveryPath = new Path(Utils.createTempDir().toURI)
+    val recoveryPath = new Path(recoveryLocalDir.toURI)
     s2 = new YarnShuffleService
     s2.setRecoveryPath(recoveryPath)
     s2.init(yarnConfig)
@@ -347,10 +351,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     // Set up a read-only local dir.
     val roDir = Utils.createTempDir()
     Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
-    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
 
     // Try to start the shuffle service, it should fail.
     val service = new YarnShuffleService()
+    service.setRecoveryPath(new Path(roDir.toURI))
 
     try {
       val error = intercept[ServiceStateException] {
@@ -369,4 +373,12 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     new ApplicationInitializationContext(user, appId, secret)
   }
 
+  test("recovery db should not be created if NM recovery is not enabled") {
+    s1 = new YarnShuffleService
+    s1.init(yarnConfig)
+    s1._recoveryPath should be (null)
+    s1.registeredExecutorFile should be (null)
+    s1.secretsFile should be (null)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org