You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/09 19:56:42 UTC

spark git commit: [SPARK-22218] spark shuffle services fails to update secret on app re-attempts

Repository: spark
Updated Branches:
  refs/heads/master f31e11404 -> a74ec6d7b


[SPARK-22218] spark shuffle services fails to update secret on app re-attempts

This patch fixes application re-attempts when running spark on yarn using the external shuffle service with security on.  Currently executors will fail to launch on any application re-attempt when launched on a nodemanager that had an executor from the first attempt.  The reason for this is because we aren't updating the secret key after the first application attempt.  The fix here is to just remove the containskey check to see if it already exists. In this way, we always add it and make sure its the most recent secret.  Similarly remove the check for containsKey on the remove since its just adding extra check that isn't really needed.

Note this worked before spark 2.2 because the check used to be contains (which was looking for the value) rather then containsKey, so that never matched and it was just always adding the new secret.

Patch was tested on a 10 node cluster as well as added the unit test.
The test ran was a wordcount where the output directory already existed.  With the bug present the application attempt failed with max number of executor Failures which were all saslExceptions.  With the fix present the application re-attempts fail with directory already exists or when you remove the directory between attempts the re-attemps succeed.

Author: Thomas Graves <tg...@unharmedunarmed.corp.ne1.yahoo.com>

Closes #19450 from tgravescs/SPARK-22218.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a74ec6d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a74ec6d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a74ec6d7

Branch: refs/heads/master
Commit: a74ec6d7bbfe185ba995dcb02d69e90a089c293e
Parents: f31e114
Author: Thomas Graves <tg...@unharmedunarmed.corp.ne1.yahoo.com>
Authored: Mon Oct 9 12:56:37 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Oct 9 12:56:37 2017 -0700

----------------------------------------------------------------------
 .../network/sasl/ShuffleSecretManager.java      | 19 +++----
 .../network/sasl/ShuffleSecretManagerSuite.java | 55 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a74ec6d7/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
index d2d008f..7253101 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -47,12 +47,11 @@ public class ShuffleSecretManager implements SecretKeyHolder {
    * fetching shuffle files written by other executors in this application.
    */
   public void registerApp(String appId, String shuffleSecret) {
-    if (!shuffleSecretMap.containsKey(appId)) {
-      shuffleSecretMap.put(appId, shuffleSecret);
-      logger.info("Registered shuffle secret for application {}", appId);
-    } else {
-      logger.debug("Application {} already registered", appId);
-    }
+    // Always put the new secret information to make sure it's the most up to date.
+    // Otherwise we have to specifically look at the application attempt in addition
+    // to the applicationId since the secrets change between application attempts on yarn.
+    shuffleSecretMap.put(appId, shuffleSecret);
+    logger.info("Registered shuffle secret for application {}", appId);
   }
 
   /**
@@ -67,12 +66,8 @@ public class ShuffleSecretManager implements SecretKeyHolder {
    * This is called when the application terminates.
    */
   public void unregisterApp(String appId) {
-    if (shuffleSecretMap.containsKey(appId)) {
-      shuffleSecretMap.remove(appId);
-      logger.info("Unregistered shuffle secret for application {}", appId);
-    } else {
-      logger.warn("Attempted to unregister application {} when it is not registered", appId);
-    }
+    shuffleSecretMap.remove(appId);
+    logger.info("Unregistered shuffle secret for application {}", appId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a74ec6d7/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java
new file mode 100644
index 0000000..46c4c33
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ShuffleSecretManagerSuite {
+  static String app1 = "app1";
+  static String app2 = "app2";
+  static String pw1 = "password1";
+  static String pw2 = "password2";
+  static String pw1update = "password1update";
+  static String pw2update = "password2update";
+
+  @Test
+  public void testMultipleRegisters() {
+    ShuffleSecretManager secretManager = new ShuffleSecretManager();
+    secretManager.registerApp(app1, pw1);
+    assertEquals(pw1, secretManager.getSecretKey(app1));
+    secretManager.registerApp(app2, ByteBuffer.wrap(pw2.getBytes()));
+    assertEquals(pw2, secretManager.getSecretKey(app2));
+
+    // now update the password for the apps and make sure it takes affect
+    secretManager.registerApp(app1, pw1update);
+    assertEquals(pw1update, secretManager.getSecretKey(app1));
+    secretManager.registerApp(app2, ByteBuffer.wrap(pw2update.getBytes()));
+    assertEquals(pw2update, secretManager.getSecretKey(app2));
+
+    secretManager.unregisterApp(app1);
+    assertNull(secretManager.getSecretKey(app1));
+    assertEquals(pw2update, secretManager.getSecretKey(app2));
+
+    secretManager.unregisterApp(app2);
+    assertNull(secretManager.getSecretKey(app2));
+    assertNull(secretManager.getSecretKey(app1));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org