You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/09/20 21:17:57 UTC

spark git commit: [SPARK-17611][YARN][TEST] Make shuffle service test really test auth.

Repository: spark
Updated Branches:
  refs/heads/master 9ac68dbc5 -> 7e418e99c


[SPARK-17611][YARN][TEST] Make shuffle service test really test auth.

Currently, the code is just swallowing exceptions, and not really checking
whether the auth information was being recorded properly. Fix both problems,
and also avoid tests inadvertently affecting other tests by modifying the
shared config variable (by making it not shared).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #15161 from vanzin/SPARK-17611.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e418e99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e418e99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e418e99

Branch: refs/heads/master
Commit: 7e418e99cff4cf512ab2a9fa74221c4655048c8d
Parents: 9ac68dbc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Sep 20 14:17:49 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Sep 20 14:17:49 2016 -0700

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  | 11 +++--
 .../network/yarn/YarnShuffleServiceSuite.scala  | 49 +++++++++++---------
 2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e418e99/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 43c8df7..ea726e3 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -105,7 +105,8 @@ public class YarnShuffleService extends AuxiliaryService {
 
   // An entity that manages the shuffle secret per application
   // This is used only if authentication is enabled
-  private ShuffleSecretManager secretManager;
+  @VisibleForTesting
+  ShuffleSecretManager secretManager;
 
   // The actual server that serves shuffle files
   private TransportServer shuffleServer = null;
@@ -197,7 +198,7 @@ public class YarnShuffleService extends AuxiliaryService {
   private void createSecretManager() throws IOException {
     secretManager = new ShuffleSecretManager();
     secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
- 
+
     // Make sure this is protected in case its not in the NM recovery dir
     FileSystem fs = FileSystem.getLocal(_conf);
     fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
@@ -306,7 +307,7 @@ public class YarnShuffleService extends AuxiliaryService {
       }
       if (db != null) {
         db.close();
-      } 
+      }
     } catch (Exception e) {
       logger.error("Exception when stopping service", e);
     }
@@ -329,7 +330,7 @@ public class YarnShuffleService extends AuxiliaryService {
 
   /**
    * Get the path specific to this auxiliary service to use for recovery.
-   */ 
+   */
   protected Path getRecoveryPath(String fileName) {
     return _recoveryPath;
   }
@@ -345,7 +346,7 @@ public class YarnShuffleService extends AuxiliaryService {
         if (recoveryFile.exists()) {
           return recoveryFile;
         }
-    } 
+    }
     // db doesn't exist in recovery path go check local dirs for it
     String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
     for (String dir : localDirs) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7e418e99/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index c86bf7f..a58784f 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.network.yarn
 
 import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermission._
 import java.util.EnumSet
@@ -40,15 +41,17 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.util.Utils
 
 class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
-  private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
+  private[yarn] var yarnConfig: YarnConfiguration = null
   private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
 
   override def beforeEach(): Unit = {
     super.beforeEach()
+    yarnConfig = new YarnConfiguration()
     yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
     yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
       classOf[YarnShuffleService].getCanonicalName)
     yarnConfig.setInt("spark.shuffle.service.port", 0)
+    yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
     val localDir = Utils.createTempDir()
     yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
   }
@@ -82,12 +85,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1.init(yarnConfig)
     val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app1Id, null)
+    val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
     val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app2Id, null)
+    val app2Data = makeAppInfo("user", app2Id)
     s1.initializeApplication(app2Data)
 
     val execStateFile = s1.registeredExecutorFile
@@ -160,12 +161,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     val secretsFile = s1.secretsFile
     secretsFile should be (null)
     val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app1Id, null)
+    val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
     val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app2Id, null)
+    val app2Data = makeAppInfo("user", app2Id)
     s1.initializeApplication(app2Data)
 
     val execStateFile = s1.registeredExecutorFile
@@ -193,8 +192,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     s1 = new YarnShuffleService
     s1.init(yarnConfig)
     val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app1Id, null)
+    val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
 
     val execStateFile = s1.registeredExecutorFile
@@ -227,8 +225,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     s2.initializeApplication(app1Data)
     // however, when we initialize a totally new app2, everything is still happy
     val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app2Id, null)
+    val app2Data = makeAppInfo("user", app2Id)
     s2.initializeApplication(app2Data)
     val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
     resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
@@ -278,14 +275,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1.init(yarnConfig)
     val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app1Id, null)
+    val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
     val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data: ApplicationInitializationContext =
-      new ApplicationInitializationContext("user", app2Id, null)
+    val app2Data = makeAppInfo("user", app2Id)
     s1.initializeApplication(app2Data)
 
+    assert(s1.secretManager.getSecretKey(app1Id.toString()) != null)
+    assert(s1.secretManager.getSecretKey(app2Id.toString()) != null)
+
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
     val secretsFile = s1.secretsFile
@@ -315,6 +313,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     s2.setRecoveryPath(recoveryPath)
     s2.init(yarnConfig)
 
+    // Ensure that s2 has loaded known apps from the secrets db.
+    assert(s2.secretManager.getSecretKey(app1Id.toString()) != null)
+    assert(s2.secretManager.getSecretKey(app2Id.toString()) != null)
+
     val execStateFile2 = s2.registeredExecutorFile
     val secretsFile2 = s2.secretsFile
 
@@ -342,19 +344,17 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
   }
 
   test("service throws error if cannot start") {
-    // Create a different config with a read-only local dir.
-    val roConfig = new YarnConfiguration(yarnConfig)
+    // Set up a read-only local dir.
     val roDir = Utils.createTempDir()
     Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
-    roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
-    roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
 
     // Try to start the shuffle service, it should fail.
     val service = new YarnShuffleService()
 
     try {
       val error = intercept[ServiceStateException] {
-        service.init(roConfig)
+        service.init(yarnConfig)
       }
       assert(error.getCause().isInstanceOf[IOException])
     } finally {
@@ -364,4 +364,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     }
   }
 
+  private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = {
+    val secret = ByteBuffer.wrap(new Array[Byte](0))
+    new ApplicationInitializationContext(user, appId, secret)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org