You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2022/09/21 13:05:34 UTC
[spark] branch master updated: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e6699570bec [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
e6699570bec is described below
commit e6699570becadb91695572bca5adc1605dc1b2a8
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Sep 21 08:05:17 2022 -0500
[SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
### What changes were proposed in this pull request?
After SPARK-17321, `YarnShuffleService` will persist data to local shuffle state db/reload data from local shuffle state db only when Yarn NodeManager start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`.
`YarnShuffleIntegrationSuite` not set `YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the configuration is false, so `YarnShuffleIntegrationSuite` will neither trigger data persistence to the db nor verify the reload of data.
This pr aims to let `YarnShuffleIntegrationSuite` restart the verification of registeredExecFile reload scenarios, to achieve this goal, this pr make the following changes:
1. Add a new un-document configuration `spark.yarn.shuffle.testing` to `YarnShuffleService`, and Initialize `_recoveryPath` when `_recoveryPath == null && spark.yarn.shuffle.testing == true`.
2. Only set `spark.yarn.shuffle.testing = true` in `YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` is not null to ensure that registeredExecFile reload scenarios will be verified.
### Why are the changes needed?
Fix registeredExecFile reload test scenarios.
Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true?
This configuration has been tried
**Hadoop 3.3.4**
```
build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-3
```
```
2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException
2022-09-10T11:44:42.1715234Z at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
2022-09-10T11:44:42.1719347Z at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
2022-09-10T11:44:42.1723090Z at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
2022-09-10T11:44:42.1726759Z at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
2022-09-10T11:44:42.1731028Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
2022-09-10T11:44:42.1735424Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
2022-09-10T11:44:42.1740303Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1745576Z at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
2022-09-10T11:44:42.1828858Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1829712Z at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
2022-09-10T11:44:42.1830633Z at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327)
2022-09-10T11:44:42.1831431Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1832279Z at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112)
```
**Hadoop 2.7.4**
```
build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-2
```
```
YarnShuffleIntegrationWithLevelDBBackendSuite:
org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite *** ABORTED ***
java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address
at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
...
Run completed in 3 seconds, 992 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 1
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
*** 1 SUITE ABORTED ***
```
From the above test, we need to use a fixed port to enable Yarn NodeManager recovery, but this is difficult to be guaranteed in UT, so this pr try a workaround way.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
Closes #37938 from LuciferYang/yarnshuffleservice-it.
Lead-authored-by: yangjie01 <ya...@baidu.com>
Co-authored-by: YangJie <ya...@baidu.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../org/apache/spark/network/yarn/YarnShuffleService.java | 9 +++++++++
.../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 12 ++++--------
.../org/apache/spark/network/yarn/YarnTestAccessor.scala | 4 ++++
3 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 58f6b6500f6..bde358a638a 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -60,6 +60,7 @@ import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
+import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
@@ -129,6 +130,10 @@ public class YarnShuffleService extends AuxiliaryService {
// Whether failure during service initialization should stop the NM.
@VisibleForTesting
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
+
+ @VisibleForTesting
+ static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
+
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
// just for testing when you want to find an open port
@@ -237,6 +242,10 @@ public class YarnShuffleService extends AuxiliaryService {
boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
+ if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) {
+ _recoveryPath = new Path(JavaUtils.createTempDir().toURI());
+ }
+
if (_recoveryPath != null) {
String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND,
DBBackend.LEVELDB.name());
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 80e014fd062..deb95773676 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -50,6 +50,7 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, dbBackend.name())
+ yarnConfig.set(YarnTestAccessor.shuffleServiceIntegrationTestingKey, "true")
yarnConfig
}
@@ -71,23 +72,18 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
val shuffleService = YarnTestAccessor.getShuffleServiceInstance
val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
+ assert(registeredExecFile != null)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
- appArgs = if (registeredExecFile != null) {
- Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
- } else {
- Seq(result.getAbsolutePath)
- },
+ appArgs = Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath),
extraConf = extraSparkConf()
)
checkResult(finalState, result)
- if (registeredExecFile != null) {
- assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
- }
+ assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
index d87cc263847..df7bfd800b1 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
@@ -37,4 +37,8 @@ object YarnTestAccessor {
def getShuffleServiceConfOverlayResourceName: String = {
YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME
}
+
+ def shuffleServiceIntegrationTestingKey: String = {
+ YarnShuffleService.INTEGRATION_TESTING
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org