You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/05/08 20:41:54 UTC

[spark] branch master updated: [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1801fd  [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…
c1801fd is described below

commit c1801fd6da4a2dd5f37dc366b92bede669e8fda0
Author: wang-zhun <wa...@gmail.com>
AuthorDate: Fri May 8 15:41:23 2020 -0500

    [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap…
    
    ### What changes were proposed in this pull request?
    Update the input parameters for instantiating `RMAppManager` and `ClientRMService`
    
    ### Why are the changes needed?
    For hadoop3.2, if `RMAppManager` is not created correctly, the following exception will occur:
    ```
    java.lang.RuntimeException: java.lang.NullPointerException
    	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
    	at org.apache.hadoop.yarn.security.YarnAuthorizationProvider.getInstance(YarnAuthorizationProvider.java:55)
    	at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.<init>(RMAppManager.java:117)
    ```
    
    ### How was this patch tested?
    UTs
    
    Closes #28456 from wang-zhun/Fix-SPARK-31235.
    
    Authored-by: wang-zhun <wa...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 86 +++++++++++-----------
 1 file changed, 43 insertions(+), 43 deletions(-)

diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b335e7f..7611ccd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, RMAppMana
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq}
@@ -222,11 +223,50 @@ class ClientSuite extends SparkFunSuite with Matchers {
       3 -> ("SPARK-SQL", "SPARK-SQL"),
       4 -> ("012345678901234567890123", "01234567890123456789"))
 
+    // Mock yarn submit application
+    val yarnClient = mock(classOf[YarnClient])
+    val rmApps = new ConcurrentHashMap[ApplicationId, RMApp]()
+    val rmContext = mock(classOf[RMContext])
+    when(rmContext.getRMApps).thenReturn(rmApps)
+    val dispatcher = mock(classOf[Dispatcher])
+    when(rmContext.getDispatcher).thenReturn(dispatcher)
+    when[EventHandler[_]](dispatcher.getEventHandler).thenReturn(
+      new EventHandler[Event[_]] {
+        override def handle(event: Event[_]): Unit = {}
+      }
+    )
+    val writer = mock(classOf[RMApplicationHistoryWriter])
+    when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer)
+    val publisher = mock(classOf[SystemMetricsPublisher])
+    when(rmContext.getSystemMetricsPublisher).thenReturn(publisher)
+    val yarnScheduler = mock(classOf[YarnScheduler])
+    val rmAppManager = new RMAppManager(rmContext,
+      yarnScheduler,
+      null,
+      mock(classOf[ApplicationACLsManager]),
+      new Configuration())
+    val clientRMService = new ClientRMService(rmContext,
+      yarnScheduler,
+      rmAppManager,
+      null,
+      null,
+      null)
+    clientRMService.init(new Configuration())
+    when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => {
+      val subContext = invocationOnMock.getArguments()(0)
+        .asInstanceOf[ApplicationSubmissionContext]
+      val request = Records.newRecord(classOf[SubmitApplicationRequest])
+      request.setApplicationSubmissionContext(subContext)
+      clientRMService.submitApplication(request)
+      null
+    })
+
+    // Spark submit application
+    val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext]))
+    when(appContext.getUnmanagedAM).thenReturn(true)
     for ((id, (sourceType, targetType)) <- appTypes) {
       val sparkConf = new SparkConf().set("spark.yarn.applicationType", sourceType)
       val args = new ClientArguments(Array())
-
-      val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext]))
       val appId = ApplicationId.newInstance(123456, id)
       appContext.setApplicationId(appId)
       val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -237,48 +277,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
         new YarnClientApplication(getNewApplicationResponse, appContext),
         containerLaunchContext)
 
-      val yarnClient = mock(classOf[YarnClient])
-      when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => {
-        val subContext = invocationOnMock.getArguments()(0)
-          .asInstanceOf[ApplicationSubmissionContext]
-        val request = Records.newRecord(classOf[SubmitApplicationRequest])
-        request.setApplicationSubmissionContext(subContext)
-
-        val rmContext = mock(classOf[RMContext])
-        val conf = mock(classOf[Configuration])
-        val map = new ConcurrentHashMap[ApplicationId, RMApp]()
-        when(rmContext.getRMApps).thenReturn(map)
-        val dispatcher = mock(classOf[Dispatcher])
-        when(rmContext.getDispatcher).thenReturn(dispatcher)
-        when[EventHandler[_]](dispatcher.getEventHandler).thenReturn(
-          new EventHandler[Event[_]] {
-            override def handle(event: Event[_]): Unit = {}
-          }
-        )
-        val writer = mock(classOf[RMApplicationHistoryWriter])
-        when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer)
-        val publisher = mock(classOf[SystemMetricsPublisher])
-        when(rmContext.getSystemMetricsPublisher).thenReturn(publisher)
-        when(appContext.getUnmanagedAM).thenReturn(true)
-
-        val rmAppManager = new RMAppManager(rmContext,
-          null,
-          null,
-          mock(classOf[ApplicationACLsManager]),
-          conf)
-        val clientRMService = new ClientRMService(rmContext,
-          null,
-          rmAppManager,
-          null,
-          null,
-          null)
-        clientRMService.submitApplication(request)
-
-        assert(map.get(subContext.getApplicationId).getApplicationType === targetType)
-        null
-      })
-
       yarnClient.submitApplication(context)
+      assert(rmApps.get(appId).getApplicationType === targetType)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org