You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/03 21:58:09 UTC

git commit: [SPARK-3535][Mesos] Fix resource handling.

Repository: spark
Updated Branches:
  refs/heads/master 6a1d48f4f -> a8c52d534


[SPARK-3535][Mesos] Fix resource handling.

Author: Brenden Matthews <br...@diddyinc.com>

Closes #2401 from brndnmtthws/master and squashes the following commits:

4abaa5d [Brenden Matthews] [SPARK-3535][Mesos] Fix resource handling.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8c52d53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8c52d53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8c52d53

Branch: refs/heads/master
Commit: a8c52d5343e19731909e73db5de151a324d31cd5
Parents: 6a1d48f
Author: Brenden Matthews <br...@diddyinc.com>
Authored: Fri Oct 3 12:58:04 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 3 12:58:04 2014 -0700

----------------------------------------------------------------------
 .../mesos/CoarseMesosSchedulerBackend.scala     |  7 ++--
 .../scheduler/cluster/mesos/MemoryUtils.scala   | 35 ++++++++++++++++++++
 .../cluster/mesos/MesosSchedulerBackend.scala   | 34 +++++++++++++++----
 docs/configuration.md                           | 11 ++++++
 4 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6456840..3161f1e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend(
         val slaveId = offer.getSlaveId.toString
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
+        if (totalCoresAcquired < maxCores &&
+            mem >= MemoryUtils.calculateTotalMemory(sc) &&
+            cpus >= 1 &&
             failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
             !slaveIdsWithExecutors.contains(slaveId)) {
           // Launch an executor on the slave
@@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend(
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", sc.executorMemory))
+            .addResources(createResource("mem",
+              MemoryUtils.calculateTotalMemory(sc)))
             .build()
           d.launchTasks(
             Collections.singleton(offer.getId),  Collections.singletonList(task), filters)

http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
new file mode 100644
index 0000000..5101ec8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkContext
+
+private[spark] object MemoryUtils {
+  // These defaults copied from YARN
+  val OVERHEAD_FRACTION = 1.07
+  val OVERHEAD_MINIMUM = 384
+
+  def calculateTotalMemory(sc: SparkContext) = {
+    math.max(
+      sc.conf.getOption("spark.mesos.executor.memoryOverhead")
+        .getOrElse(OVERHEAD_MINIMUM.toString)
+        .toInt + sc.executorMemory,
+        OVERHEAD_FRACTION * sc.executorMemory
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index a9ef126..4c49aa0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend(
       command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
+    val cpus = Resource.newBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder()
+        .setValue(scheduler.CPUS_PER_TASK).build())
+      .build()
     val memory = Resource.newBuilder()
       .setName("mem")
       .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
+      .setScalar(
+        Value.Scalar.newBuilder()
+          .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
       .build()
     ExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
+      .addResources(cpus)
       .addResources(memory)
       .build()
   }
@@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend(
         val offerableWorkers = new ArrayBuffer[WorkerOffer]
         val offerableIndices = new HashMap[String, Int]
 
-        def enoughMemory(o: Offer) = {
+        def sufficientOffer(o: Offer) = {
           val mem = getResource(o.getResourcesList, "mem")
+          val cpus = getResource(o.getResourcesList, "cpus")
           val slaveId = o.getSlaveId.getValue
-          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
+          (mem >= MemoryUtils.calculateTotalMemory(sc) &&
+            // need at least 1 for executor, 1 for task
+            cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+            (slaveIdsWithExecutors.contains(slaveId) &&
+              cpus >= scheduler.CPUS_PER_TASK)
         }
 
-        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-          offerableIndices.put(offer.getSlaveId.getValue, index)
+        for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
+          val slaveId = offer.getSlaveId.getValue
+          offerableIndices.put(slaveId, index)
+          val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
+            getResource(offer.getResourcesList, "cpus").toInt
+          } else {
+            // If the executor doesn't exist yet, subtract CPU for executor
+            getResource(offer.getResourcesList, "cpus").toInt -
+              scheduler.CPUS_PER_TASK
+          }
           offerableWorkers += new WorkerOffer(
             offer.getSlaveId.getValue,
             offer.getHostname,
-            getResource(offer.getResourcesList, "cpus").toInt)
+            cpus)
         }
 
         // Call into the TaskSchedulerImpl

http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a782809..1c33855 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -253,6 +253,17 @@ Apart from these, the following properties are also available, and may be useful
     <code>spark.executor.uri</code>.
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.executor.memoryOverhead</code></td>
+  <td>executor memory * 0.07, with minimum of 384</td>
+  <td>
+    This value is an additive for <code>spark.executor.memory</code>, specified in MiB,
+    which is used to calculate the total Mesos task memory. A value of <code>384</code>
+    implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum
+    overhead. The final overhead will be the larger of either
+    `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
+  </td>
+</tr>
 </table>
 
 #### Shuffle Behavior


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org