You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/07 02:20:52 UTC

spark git commit: [SPARK-4277] Support external shuffle service on Standalone Worker

Repository: spark
Updated Branches:
  refs/heads/master 96136f222 -> 6e9ef10fd


[SPARK-4277] Support external shuffle service on Standalone Worker

Author: Aaron Davidson <aa...@databricks.com>

Closes #3142 from aarondav/worker and squashes the following commits:

3780bd7 [Aaron Davidson] Address comments
2dcdfc1 [Aaron Davidson] Add private[worker]
47f49d3 [Aaron Davidson] NettyBlockTransferService shouldn't care about app ids (it's only b/t executors)
258417c [Aaron Davidson] [SPARK-4277] Support external shuffle service on executor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e9ef10f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e9ef10f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e9ef10f

Branch: refs/heads/master
Commit: 6e9ef10fd7446a11f37446c961916ba2a8e02cb8
Parents: 96136f2
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Nov 6 17:20:46 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Nov 6 17:20:46 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SecurityManager.scala      | 14 +----
 .../worker/StandaloneWorkerShuffleService.scala | 66 ++++++++++++++++++++
 .../org/apache/spark/deploy/worker/Worker.scala |  8 ++-
 .../storage/ShuffleBlockFetcherIterator.scala   |  2 +-
 .../netty/NettyBlockTransferSecuritySuite.scala | 12 ----
 .../apache/spark/network/sasl/SaslMessage.java  |  3 +-
 6 files changed, 79 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index dee935f..dbff9d1 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -343,15 +343,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
    */
   def getSecretKey(): String = secretKey
 
-  override def getSaslUser(appId: String): String = {
-    val myAppId = sparkConf.getAppId
-    require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
-    getSaslUser()
-  }
-
-  override def getSecretKey(appId: String): String = {
-    val myAppId = sparkConf.getAppId
-    require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
-    getSecretKey()
-  }
+  // Default SecurityManager only has a single secret key, so ignore appId.
+  override def getSaslUser(appId: String): String = getSaslUser()
+  override def getSecretKey(appId: String): String = getSecretKey()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
new file mode 100644
index 0000000..88118e2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.sasl.SaslRpcHandler
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+
+/**
+ * Provides a server from which Executors can read shuffle files (rather than reading directly from
+ * each other), to provide uninterrupted access to the files in the face of executors being turned
+ * off or killed.
+ *
+ * Optionally requires SASL authentication in order to read. See [[SecurityManager]].
+ */
+private[worker]
+class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
+  extends Logging {
+
+  private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
+  private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
+  private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
+
+  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
+  private val blockHandler = new ExternalShuffleBlockHandler()
+  private val transportContext: TransportContext = {
+    val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
+    new TransportContext(transportConf, handler)
+  }
+
+  private var server: TransportServer = _
+
+  /** Starts the external shuffle service if the user has configured us to. */
+  def startIfEnabled() {
+    if (enabled) {
+      require(server == null, "Shuffle server already started")
+      logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+      server = transportContext.createServer(port)
+    }
+  }
+
+  def stop() {
+    if (enabled && server != null) {
+      server.close()
+      server = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f1f66d0..ca262de 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -111,6 +111,9 @@ private[spark] class Worker(
   val drivers = new HashMap[String, DriverRunner]
   val finishedDrivers = new HashMap[String, DriverRunner]
 
+  // The shuffle service is not actually started unless configured.
+  val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+
   val publicAddress = {
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
@@ -154,6 +157,7 @@ private[spark] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    shuffleService.startIfEnabled()
     webUi = new WorkerWebUI(this, workDir, webUiPort)
     webUi.bind()
     registerWithMaster()
@@ -419,6 +423,7 @@ private[spark] class Worker(
     registrationRetryTimer.foreach(_.cancel())
     executors.values.foreach(_.kill())
     drivers.values.foreach(_.kill())
+    shuffleService.stop()
     webUi.stop()
     metricsSystem.stop()
   }
@@ -441,7 +446,8 @@ private[spark] object Worker extends Logging {
       cores: Int,
       memory: Int,
       masterUrls: Array[String],
-      workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+      workDir: String,
+      workerNumber: Option[Int] = None): (ActorSystem, Int) = {
 
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val conf = new SparkConf

http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 1e57918..6b1f57a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -92,7 +92,7 @@ final class ShuffleBlockFetcherIterator(
    * Current [[FetchResult]] being processed. We track this so we can release the current buffer
    * in case of a runtime exception when processing the current buffer.
    */
-  private[this] var currentResult: FetchResult = null
+  @volatile private[this] var currentResult: FetchResult = null
 
   /**
    * Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that

http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index bed0ed9..9162ec9 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -89,18 +89,6 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh
     }
   }
 
-  test("security mismatch app ids") {
-    val conf0 = new SparkConf()
-      .set("spark.authenticate", "true")
-      .set("spark.authenticate.secret", "good")
-      .set("spark.app.id", "app-id")
-    val conf1 = conf0.clone.set("spark.app.id", "other-id")
-    testConnection(conf0, conf1) match {
-      case Success(_) => fail("Should have failed")
-      case Failure(t) => t.getMessage should include ("SASL appId app-id did not match")
-    }
-  }
-
   /**
    * Creates two servers with different configurations and sees if they can talk.
    * Returns Success() if they can transfer a block, and Failure() if the block transfer was failed

http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java
index 5b77e18..599cc64 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java
@@ -58,7 +58,8 @@ class SaslMessage implements Encodable {
 
   public static SaslMessage decode(ByteBuf buf) {
     if (buf.readByte() != TAG_BYTE) {
-      throw new IllegalStateException("Expected SaslMessage, received something else");
+      throw new IllegalStateException("Expected SaslMessage, received something else"
+        + " (maybe your client does not have SASL enabled?)");
     }
 
     int idLength = buf.readInt();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org