You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/03 21:58:09 UTC
git commit: [SPARK-3535][Mesos] Fix resource handling.
Repository: spark
Updated Branches:
refs/heads/master 6a1d48f4f -> a8c52d534
[SPARK-3535][Mesos] Fix resource handling.
Author: Brenden Matthews <br...@diddyinc.com>
Closes #2401 from brndnmtthws/master and squashes the following commits:
4abaa5d [Brenden Matthews] [SPARK-3535][Mesos] Fix resource handling.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8c52d53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8c52d53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8c52d53
Branch: refs/heads/master
Commit: a8c52d5343e19731909e73db5de151a324d31cd5
Parents: 6a1d48f
Author: Brenden Matthews <br...@diddyinc.com>
Authored: Fri Oct 3 12:58:04 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 3 12:58:04 2014 -0700
----------------------------------------------------------------------
.../mesos/CoarseMesosSchedulerBackend.scala | 7 ++--
.../scheduler/cluster/mesos/MemoryUtils.scala | 35 ++++++++++++++++++++
.../cluster/mesos/MesosSchedulerBackend.scala | 34 +++++++++++++++----
docs/configuration.md | 11 ++++++
4 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6456840..3161f1e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores &&
+ mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
@@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", sc.executorMemory))
+ .addResources(createResource("mem",
+ MemoryUtils.calculateTotalMemory(sc)))
.build()
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
new file mode 100644
index 0000000..5101ec8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkContext
+
+private[spark] object MemoryUtils {
+ // These defaults copied from YARN
+ val OVERHEAD_FRACTION = 1.07
+ val OVERHEAD_MINIMUM = 384
+
+ def calculateTotalMemory(sc: SparkContext) = {
+ math.max(
+ sc.conf.getOption("spark.mesos.executor.memoryOverhead")
+ .getOrElse(OVERHEAD_MINIMUM.toString)
+ .toInt + sc.executorMemory,
+ OVERHEAD_FRACTION * sc.executorMemory
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index a9ef126..4c49aa0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend(
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
+ val cpus = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(scheduler.CPUS_PER_TASK).build())
+ .build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
+ .setScalar(
+ Value.Scalar.newBuilder()
+ .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
+ .addResources(cpus)
.addResources(memory)
.build()
}
@@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend(
val offerableWorkers = new ArrayBuffer[WorkerOffer]
val offerableIndices = new HashMap[String, Int]
- def enoughMemory(o: Offer) = {
+ def sufficientOffer(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
+ val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ (mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ // need at least 1 for executor, 1 for task
+ cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ (slaveIdsWithExecutors.contains(slaveId) &&
+ cpus >= scheduler.CPUS_PER_TASK)
}
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices.put(offer.getSlaveId.getValue, index)
+ for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
+ val slaveId = offer.getSlaveId.getValue
+ offerableIndices.put(slaveId, index)
+ val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
+ getResource(offer.getResourcesList, "cpus").toInt
+ } else {
+ // If the executor doesn't exist yet, subtract CPU for executor
+ getResource(offer.getResourcesList, "cpus").toInt -
+ scheduler.CPUS_PER_TASK
+ }
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
+ cpus)
}
// Call into the TaskSchedulerImpl
http://git-wip-us.apache.org/repos/asf/spark/blob/a8c52d53/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a782809..1c33855 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -253,6 +253,17 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.executor.uri</code>.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.executor.memoryOverhead</code></td>
+ <td>executor memory * 0.07, with minimum of 384</td>
+ <td>
+ This value is an additive for <code>spark.executor.memory</code>, specified in MiB,
+ which is used to calculate the total Mesos task memory. A value of <code>384</code>
+ implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum
+ overhead. The final overhead will be the larger of either
+ `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
+ </td>
+</tr>
</table>
#### Shuffle Behavior
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org