You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/26 00:57:31 UTC

spark git commit: [SPARK-21494][NETWORK] Use correct app id when authenticating to external service.

Repository: spark
Updated Branches:
  refs/heads/master ebc24a9b7 -> 300807c6e


[SPARK-21494][NETWORK] Use correct app id when authenticating to external service.

There was some code based on the old SASL handler in the new auth client that
was incorrectly using the SASL user as the user to authenticate against the
external shuffle service. This caused the external service to not be able to
find the correct secret to authenticate the connection, failing the connection.

In the course of debugging, I found that some log messages from the YARN shuffle
service were a little noisy, so I silenced some of them, and also added a couple
of new ones that helped find this issue. On top of that, I found that a check
in the code that records app secrets was wrong, causing more log spam and also
using an O(n) operation instead of an O(1) call.

Also added a new integration suite for the YARN shuffle service with auth on,
and verified it failed before, and passes now.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18706 from vanzin/SPARK-21494.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/300807c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/300807c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/300807c6

Branch: refs/heads/master
Commit: 300807c6e3011e4d78c6cf750201d0ab8e5bdaf5
Parents: ebc24a9
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jul 25 17:57:26 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 25 17:57:26 2017 -0700

----------------------------------------------------------------------
 .../network/crypto/AuthClientBootstrap.java     |  6 +--
 .../spark/network/crypto/AuthRpcHandler.java    |  7 +++-
 .../network/sasl/ShuffleSecretManager.java      |  4 +-
 .../spark/network/yarn/YarnShuffleService.java  |  2 -
 .../yarn/YarnShuffleIntegrationSuite.scala      | 42 +++++++++++++++++---
 5 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/300807c6/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 799f454..3c26378 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -50,7 +50,6 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
 
   private final TransportConf conf;
   private final String appId;
-  private final String authUser;
   private final SecretKeyHolder secretKeyHolder;
 
   public AuthClientBootstrap(
@@ -65,7 +64,6 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
     // required by the protocol. At some point, though, it would be better for the actual app ID
     // to be provided here.
     this.appId = appId;
-    this.authUser = secretKeyHolder.getSaslUser(appId);
     this.secretKeyHolder = secretKeyHolder;
   }
 
@@ -97,8 +95,8 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
   private void doSparkAuth(TransportClient client, Channel channel)
     throws GeneralSecurityException, IOException {
 
-    String secretKey = secretKeyHolder.getSecretKey(authUser);
-    try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
+    String secretKey = secretKeyHolder.getSecretKey(appId);
+    try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
       ClientChallenge challenge = engine.challenge();
       ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
       challenge.encode(challengeData);

http://git-wip-us.apache.org/repos/asf/spark/blob/300807c6/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 0a5c029..8a6e385 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.crypto;
 import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -113,7 +114,11 @@ class AuthRpcHandler extends RpcHandler {
     // Here we have the client challenge, so perform the new auth protocol and set up the channel.
     AuthEngine engine = null;
     try {
-      engine = new AuthEngine(challenge.appId, secretKeyHolder.getSecretKey(challenge.appId), conf);
+      String secret = secretKeyHolder.getSecretKey(challenge.appId);
+      Preconditions.checkState(secret != null,
+        "Trying to authenticate non-registered app %s.", challenge.appId);
+      LOG.debug("Authenticating challenge for app {}.", challenge.appId);
+      engine = new AuthEngine(challenge.appId, secret, conf);
       ServerResponse response = engine.respond(challenge);
       ByteBuf responseData = Unpooled.buffer(response.encodedLength());
       response.encode(responseData);

http://git-wip-us.apache.org/repos/asf/spark/blob/300807c6/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
index 426a604..d2d008f 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -47,7 +47,7 @@ public class ShuffleSecretManager implements SecretKeyHolder {
    * fetching shuffle files written by other executors in this application.
    */
   public void registerApp(String appId, String shuffleSecret) {
-    if (!shuffleSecretMap.contains(appId)) {
+    if (!shuffleSecretMap.containsKey(appId)) {
       shuffleSecretMap.put(appId, shuffleSecret);
       logger.info("Registered shuffle secret for application {}", appId);
     } else {
@@ -67,7 +67,7 @@ public class ShuffleSecretManager implements SecretKeyHolder {
    * This is called when the application terminates.
    */
   public void unregisterApp(String appId) {
-    if (shuffleSecretMap.contains(appId)) {
+    if (shuffleSecretMap.containsKey(appId)) {
       shuffleSecretMap.remove(appId);
       logger.info("Unregistered shuffle secret for application {}", appId);
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/300807c6/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index fd50e3a..cd67eb2 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -243,7 +243,6 @@ public class YarnShuffleService extends AuxiliaryService {
     String appId = context.getApplicationId().toString();
     try {
       ByteBuffer shuffleSecret = context.getApplicationDataForService();
-      logger.info("Initializing application {}", appId);
       if (isAuthenticationEnabled()) {
         AppId fullId = new AppId(appId);
         if (db != null) {
@@ -262,7 +261,6 @@ public class YarnShuffleService extends AuxiliaryService {
   public void stopApplication(ApplicationTerminationContext context) {
     String appId = context.getApplicationId().toString();
     try {
-      logger.info("Stopping application {}", appId);
       if (isAuthenticationEnabled()) {
         AppId fullId = new AppId(appId);
         if (db != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/300807c6/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 950ebd9..75427b4 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
 
 import org.apache.spark._
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.ShuffleTestAccessor
 import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
 import org.apache.spark.tags.ExtendedYarnTest
@@ -46,28 +48,58 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
     yarnConfig
   }
 
+  protected def extraSparkConf(): Map[String, String] = {
+    val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
+    val shuffleService = YarnTestAccessor.getShuffleServiceInstance
+    logInfo("Shuffle service port = " + shuffleServicePort)
+
+    Map(
+      "spark.shuffle.service.enabled" -> "true",
+      "spark.shuffle.service.port" -> shuffleServicePort.toString,
+      MAX_EXECUTOR_FAILURES.key -> "1"
+    )
+  }
+
   test("external shuffle service") {
     val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
     val shuffleService = YarnTestAccessor.getShuffleServiceInstance
 
     val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
 
-    logInfo("Shuffle service port = " + shuffleServicePort)
     val result = File.createTempFile("result", null, tempDir)
     val finalState = runSpark(
       false,
       mainClassName(YarnExternalShuffleDriver.getClass),
       appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
-      extraConf = Map(
-        "spark.shuffle.service.enabled" -> "true",
-        "spark.shuffle.service.port" -> shuffleServicePort.toString
-      )
+      extraConf = extraSparkConf()
     )
     checkResult(finalState, result)
     assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
   }
 }
 
+/**
+ * Integration test for the external shuffle service with auth on.
+ */
+@ExtendedYarnTest
+class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite {
+
+  override def newYarnConfig(): YarnConfiguration = {
+    val yarnConfig = super.newYarnConfig()
+    yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true")
+    yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true")
+    yarnConfig
+  }
+
+  override protected def extraSparkConf(): Map[String, String] = {
+    super.extraSparkConf() ++ Map(
+      NETWORK_AUTH_ENABLED.key -> "true",
+      NETWORK_ENCRYPTION_ENABLED.key -> "true"
+    )
+  }
+
+}
+
 private object YarnExternalShuffleDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org