You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/05/08 20:41:54 UTC
[spark] branch master updated: [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c1801fd [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…
c1801fd is described below
commit c1801fd6da4a2dd5f37dc366b92bede669e8fda0
Author: wang-zhun <wa...@gmail.com>
AuthorDate: Fri May 8 15:41:23 2020 -0500
[SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…
### What changes were proposed in this pull request?
Update the input parameters for instantiating `RMAppManager` and `ClientRMService`
### Why are the changes needed?
For hadoop3.2, if `RMAppManager` is not created correctly, the following exception will occur:
```
java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
at org.apache.hadoop.yarn.security.YarnAuthorizationProvider.getInstance(YarnAuthorizationProvider.java:55)
at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.<init>(RMAppManager.java:117)
```
### How was this patch tested?
UTs
Closes #28456 from wang-zhun/Fix-SPARK-31235.
Authored-by: wang-zhun <wa...@gmail.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../org/apache/spark/deploy/yarn/ClientSuite.scala | 86 +++++++++++-----------
1 file changed, 43 insertions(+), 43 deletions(-)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b335e7f..7611ccd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, RMAppMana
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager
import org.apache.hadoop.yarn.util.Records
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq}
@@ -222,11 +223,50 @@ class ClientSuite extends SparkFunSuite with Matchers {
3 -> ("SPARK-SQL", "SPARK-SQL"),
4 -> ("012345678901234567890123", "01234567890123456789"))
+ // Mock yarn submit application
+ val yarnClient = mock(classOf[YarnClient])
+ val rmApps = new ConcurrentHashMap[ApplicationId, RMApp]()
+ val rmContext = mock(classOf[RMContext])
+ when(rmContext.getRMApps).thenReturn(rmApps)
+ val dispatcher = mock(classOf[Dispatcher])
+ when(rmContext.getDispatcher).thenReturn(dispatcher)
+ when[EventHandler[_]](dispatcher.getEventHandler).thenReturn(
+ new EventHandler[Event[_]] {
+ override def handle(event: Event[_]): Unit = {}
+ }
+ )
+ val writer = mock(classOf[RMApplicationHistoryWriter])
+ when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer)
+ val publisher = mock(classOf[SystemMetricsPublisher])
+ when(rmContext.getSystemMetricsPublisher).thenReturn(publisher)
+ val yarnScheduler = mock(classOf[YarnScheduler])
+ val rmAppManager = new RMAppManager(rmContext,
+ yarnScheduler,
+ null,
+ mock(classOf[ApplicationACLsManager]),
+ new Configuration())
+ val clientRMService = new ClientRMService(rmContext,
+ yarnScheduler,
+ rmAppManager,
+ null,
+ null,
+ null)
+ clientRMService.init(new Configuration())
+ when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => {
+ val subContext = invocationOnMock.getArguments()(0)
+ .asInstanceOf[ApplicationSubmissionContext]
+ val request = Records.newRecord(classOf[SubmitApplicationRequest])
+ request.setApplicationSubmissionContext(subContext)
+ clientRMService.submitApplication(request)
+ null
+ })
+
+ // Spark submit application
+ val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext]))
+ when(appContext.getUnmanagedAM).thenReturn(true)
for ((id, (sourceType, targetType)) <- appTypes) {
val sparkConf = new SparkConf().set("spark.yarn.applicationType", sourceType)
val args = new ClientArguments(Array())
-
- val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext]))
val appId = ApplicationId.newInstance(123456, id)
appContext.setApplicationId(appId)
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -237,48 +277,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
- val yarnClient = mock(classOf[YarnClient])
- when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => {
- val subContext = invocationOnMock.getArguments()(0)
- .asInstanceOf[ApplicationSubmissionContext]
- val request = Records.newRecord(classOf[SubmitApplicationRequest])
- request.setApplicationSubmissionContext(subContext)
-
- val rmContext = mock(classOf[RMContext])
- val conf = mock(classOf[Configuration])
- val map = new ConcurrentHashMap[ApplicationId, RMApp]()
- when(rmContext.getRMApps).thenReturn(map)
- val dispatcher = mock(classOf[Dispatcher])
- when(rmContext.getDispatcher).thenReturn(dispatcher)
- when[EventHandler[_]](dispatcher.getEventHandler).thenReturn(
- new EventHandler[Event[_]] {
- override def handle(event: Event[_]): Unit = {}
- }
- )
- val writer = mock(classOf[RMApplicationHistoryWriter])
- when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer)
- val publisher = mock(classOf[SystemMetricsPublisher])
- when(rmContext.getSystemMetricsPublisher).thenReturn(publisher)
- when(appContext.getUnmanagedAM).thenReturn(true)
-
- val rmAppManager = new RMAppManager(rmContext,
- null,
- null,
- mock(classOf[ApplicationACLsManager]),
- conf)
- val clientRMService = new ClientRMService(rmContext,
- null,
- rmAppManager,
- null,
- null,
- null)
- clientRMService.submitApplication(request)
-
- assert(map.get(subContext.getApplicationId).getApplicationType === targetType)
- null
- })
-
yarnClient.submitApplication(context)
+ assert(rmApps.get(appId).getApplicationType === targetType)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org