You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2022/09/21 13:05:34 UTC

[spark] branch master updated: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6699570bec [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
e6699570bec is described below

commit e6699570becadb91695572bca5adc1605dc1b2a8
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Sep 21 08:05:17 2022 -0500

    [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
    
    ### What changes were proposed in this pull request?
    After SPARK-17321, `YarnShuffleService` will persist data to local shuffle state db/reload data from local shuffle state db only when Yarn NodeManager start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`.
    
    `YarnShuffleIntegrationSuite` not set `YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the configuration is false,  so `YarnShuffleIntegrationSuite` will neither trigger data persistence to the db nor verify the reload of data.
    
    This pr aims to let `YarnShuffleIntegrationSuite` restart the verification of registeredExecFile reload scenarios, to achieve this goal, this pr make the following changes:
    
    1. Add a new un-document configuration `spark.yarn.shuffle.testing` to `YarnShuffleService`, and Initialize `_recoveryPath` when `_recoveryPath == null && spark.yarn.shuffle.testing == true`.
    
    2. Only set `spark.yarn.shuffle.testing = true` in `YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` is not null to ensure that registeredExecFile reload scenarios will be verified.
    
    ### Why are the changes needed?
    Fix registeredExecFile reload  test scenarios.
    
    Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true?
    
    This configuration has been tried
    
    **Hadoop 3.3.4**
    
    ```
    build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-3
    ```
    
    ```
    2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException
    2022-09-10T11:44:42.1715234Z at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    2022-09-10T11:44:42.1719347Z at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    2022-09-10T11:44:42.1723090Z at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    2022-09-10T11:44:42.1726759Z at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    2022-09-10T11:44:42.1731028Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
    2022-09-10T11:44:42.1735424Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
    2022-09-10T11:44:42.1740303Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
    2022-09-10T11:44:42.1745576Z at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
    2022-09-10T11:44:42.1828858Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
    2022-09-10T11:44:42.1829712Z at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
    2022-09-10T11:44:42.1830633Z at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327)
    2022-09-10T11:44:42.1831431Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
    2022-09-10T11:44:42.1832279Z at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112)
    ```
    
    **Hadoop 2.7.4**
    
    ```
    build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-2
    ```
    
    ```
    YarnShuffleIntegrationWithLevelDBBackendSuite:
    org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite *** ABORTED ***
      java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395)
      at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
      at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
      at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
      at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
      at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560)
      at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
      at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
      at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278)
      at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
      ...
    Run completed in 3 seconds, 992 milliseconds.
    Total number of tests run: 0
    Suites: completed 1, aborted 1
    Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
    *** 1 SUITE ABORTED ***
    ```
    
    From the above test, we need to use a fixed port to enable Yarn NodeManager recovery, but this is difficult to be guaranteed in UT, so this pr try a workaround way.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    Closes #37938 from LuciferYang/yarnshuffleservice-it.
    
    Lead-authored-by: yangjie01 <ya...@baidu.com>
    Co-authored-by: YangJie <ya...@baidu.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/network/yarn/YarnShuffleService.java    |  9 +++++++++
 .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala      | 12 ++++--------
 .../org/apache/spark/network/yarn/YarnTestAccessor.scala     |  4 ++++
 3 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 58f6b6500f6..bde358a638a 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -60,6 +60,7 @@ import org.apache.spark.network.sasl.ShuffleSecretManager;
 import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.network.server.TransportServerBootstrap;
 import org.apache.spark.network.shuffle.ExternalBlockHandler;
+import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
 import org.apache.spark.network.yarn.util.HadoopConfigProvider;
 
@@ -129,6 +130,10 @@ public class YarnShuffleService extends AuxiliaryService {
   // Whether failure during service initialization should stop the NM.
   @VisibleForTesting
   static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
+
+  @VisibleForTesting
+  static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
+
   private static final boolean DEFAULT_STOP_ON_FAILURE = false;
 
   // just for testing when you want to find an open port
@@ -237,6 +242,10 @@ public class YarnShuffleService extends AuxiliaryService {
 
     boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
 
+    if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) {
+      _recoveryPath = new Path(JavaUtils.createTempDir().toURI());
+    }
+
     if (_recoveryPath != null) {
       String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND,
         DBBackend.LEVELDB.name());
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 80e014fd062..deb95773676 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -50,6 +50,7 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
       classOf[YarnShuffleService].getCanonicalName)
     yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
     yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, dbBackend.name())
+    yarnConfig.set(YarnTestAccessor.shuffleServiceIntegrationTestingKey, "true")
     yarnConfig
   }
 
@@ -71,23 +72,18 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
     val shuffleService = YarnTestAccessor.getShuffleServiceInstance
 
     val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
+    assert(registeredExecFile != null)
 
     val result = File.createTempFile("result", null, tempDir)
     val finalState = runSpark(
       false,
       mainClassName(YarnExternalShuffleDriver.getClass),
-      appArgs = if (registeredExecFile != null) {
-        Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
-      } else {
-        Seq(result.getAbsolutePath)
-      },
+      appArgs = Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath),
       extraConf = extraSparkConf()
     )
     checkResult(finalState, result)
 
-    if (registeredExecFile != null) {
-      assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
-    }
+    assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
   }
 }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
index d87cc263847..df7bfd800b1 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
@@ -37,4 +37,8 @@ object YarnTestAccessor {
   def getShuffleServiceConfOverlayResourceName: String = {
     YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME
   }
+
+  def shuffleServiceIntegrationTestingKey: String = {
+    YarnShuffleService.INTEGRATION_TESTING
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org