You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/01/25 22:31:13 UTC

[1/2] spark git commit: [SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 886f73737 -> 00a48075a


[SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.

That method is prone to stack overflows when the input map is really
large; instead, use plain "map". Also includes a unit test that was
tested and caused stack overflows without the fix.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #16667 from vanzin/SPARK-18750.

(cherry picked from commit 76db394f2baedc2c7b7a52c05314a64ec9068263)
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d9e8d5e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d9e8d5e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d9e8d5e

Branch: refs/heads/branch-2.0
Commit: 2d9e8d5e90ae7365f38923822df7f521200dc7bc
Parents: 886f737
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Jan 25 08:18:41 2017 -0600
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Jan 25 10:51:11 2017 -0800

----------------------------------------------------------------------
 .../yarn/LocalityPlacementStrategySuite.scala   | 87 ++++++++++++++++++++
 ...ityPreferredContainerPlacementStrategy.scala | 11 +--
 2 files changed, 93 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d9e8d5e/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
new file mode 100644
index 0000000..fb80ff9
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{HashMap, HashSet, Set}
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class LocalityPlacementStrategySuite extends SparkFunSuite {
+
+  test("handle large number of containers and tasks (SPARK-18750)") {
+    // Run the test in a thread with a small stack size, since the original issue
+    // surfaced as a StackOverflowError.
+    var error: Throwable = null
+
+    val runnable = new Runnable() {
+      override def run(): Unit = try {
+        runTest()
+      } catch {
+        case e: Throwable => error = e
+      }
+    }
+
+    val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
+    thread.start()
+    thread.join()
+
+    assert(error === null)
+  }
+
+  private def runTest(): Unit = {
+    val yarnConf = new YarnConfiguration()
+    yarnConf.setClass(
+      CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+      classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+    // The numbers below have been chosen to balance being large enough to replicate the
+    // original issue while not taking too long to run when the issue is fixed. The main
+    // goal is to create enough requests for localized containers (so there should be many
+    // tasks on several hosts that have no allocated containers).
+
+    val resource = Resource.newInstance(8 * 1024, 4)
+    val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
+      yarnConf, resource)
+
+    val totalTasks = 32 * 1024
+    val totalContainers = totalTasks / 16
+    val totalHosts = totalContainers / 16
+
+    val mockId = mock(classOf[ContainerId])
+    val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
+    val containers = (1 to totalContainers).map { i => mockId }
+    val count = containers.size / hosts.size / 2
+
+    val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
+    hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
+      val hostContainers = new HashSet[ContainerId]()
+      containers.drop(count * i).take(i).foreach { c => hostContainers += c }
+      hostToContainerMap(host) = hostContainers
+    }
+
+    strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
+      hostToContainerMap, Nil)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2d9e8d5e/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 8772e26..db638d8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
       val largestRatio = updatedHostToContainerCount.values.max
       // Round the ratio of preferred locality to the number of locality required container
       // number, which is used for locality preferred host calculating.
-      var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+      var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
         val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
-        adjustedRatio.ceil.toInt
+        (k, adjustedRatio.ceil.toInt)
       }
 
       for (i <- 0 until requiredLocalityAwareContainerNum) {
@@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
 
         // Minus 1 each time when the host is used. When the current ratio is 0,
         // which means all the required ratio is satisfied, this host will not be allocated again.
-        preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+        preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
       }
     }
 
@@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
 
     val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
     val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
-    pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
-      .toMap
+    pendingHostToContainerCount.map { case (k, v) =>
+      (k, v * localityMatchedPendingNum / possibleTotalContainerNum)
+    }.toMap
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-18750][YARN] Follow up: move test to correct directory in 2.1 branch.

Posted by va...@apache.org.
[SPARK-18750][YARN] Follow up: move test to correct directory in 2.1 branch.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #16704 from vanzin/SPARK-18750_2.1.

(cherry picked from commit 97d3353ef16a6e6edc93d8177b08442a03e19eee)
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00a48075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00a48075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00a48075

Branch: refs/heads/branch-2.0
Commit: 00a48075ac2e0d6dbdc7b2632d8702af4f30aa97
Parents: 2d9e8d5
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Jan 25 14:22:58 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Jan 25 14:23:48 2017 -0800

----------------------------------------------------------------------
 .../yarn/LocalityPlacementStrategySuite.scala   | 87 --------------------
 .../yarn/LocalityPlacementStrategySuite.scala   | 87 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00a48075/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
deleted file mode 100644
index fb80ff9..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.mutable.{HashMap, HashSet, Set}
-
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic
-import org.apache.hadoop.net.DNSToSwitchMapping
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.mockito.Mockito._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-
-class LocalityPlacementStrategySuite extends SparkFunSuite {
-
-  test("handle large number of containers and tasks (SPARK-18750)") {
-    // Run the test in a thread with a small stack size, since the original issue
-    // surfaced as a StackOverflowError.
-    var error: Throwable = null
-
-    val runnable = new Runnable() {
-      override def run(): Unit = try {
-        runTest()
-      } catch {
-        case e: Throwable => error = e
-      }
-    }
-
-    val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
-    thread.start()
-    thread.join()
-
-    assert(error === null)
-  }
-
-  private def runTest(): Unit = {
-    val yarnConf = new YarnConfiguration()
-    yarnConf.setClass(
-      CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
-      classOf[MockResolver], classOf[DNSToSwitchMapping])
-
-    // The numbers below have been chosen to balance being large enough to replicate the
-    // original issue while not taking too long to run when the issue is fixed. The main
-    // goal is to create enough requests for localized containers (so there should be many
-    // tasks on several hosts that have no allocated containers).
-
-    val resource = Resource.newInstance(8 * 1024, 4)
-    val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
-      yarnConf, resource)
-
-    val totalTasks = 32 * 1024
-    val totalContainers = totalTasks / 16
-    val totalHosts = totalContainers / 16
-
-    val mockId = mock(classOf[ContainerId])
-    val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
-    val containers = (1 to totalContainers).map { i => mockId }
-    val count = containers.size / hosts.size / 2
-
-    val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
-    hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
-      val hostContainers = new HashSet[ContainerId]()
-      containers.drop(count * i).take(i).foreach { c => hostContainers += c }
-      hostToContainerMap(host) = hostContainers
-    }
-
-    strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
-      hostToContainerMap, Nil)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/00a48075/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
new file mode 100644
index 0000000..fb80ff9
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{HashMap, HashSet, Set}
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class LocalityPlacementStrategySuite extends SparkFunSuite {
+
+  test("handle large number of containers and tasks (SPARK-18750)") {
+    // Run the test in a thread with a small stack size, since the original issue
+    // surfaced as a StackOverflowError.
+    var error: Throwable = null
+
+    val runnable = new Runnable() {
+      override def run(): Unit = try {
+        runTest()
+      } catch {
+        case e: Throwable => error = e
+      }
+    }
+
+    val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
+    thread.start()
+    thread.join()
+
+    assert(error === null)
+  }
+
+  private def runTest(): Unit = {
+    val yarnConf = new YarnConfiguration()
+    yarnConf.setClass(
+      CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+      classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+    // The numbers below have been chosen to balance being large enough to replicate the
+    // original issue while not taking too long to run when the issue is fixed. The main
+    // goal is to create enough requests for localized containers (so there should be many
+    // tasks on several hosts that have no allocated containers).
+
+    val resource = Resource.newInstance(8 * 1024, 4)
+    val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
+      yarnConf, resource)
+
+    val totalTasks = 32 * 1024
+    val totalContainers = totalTasks / 16
+    val totalHosts = totalContainers / 16
+
+    val mockId = mock(classOf[ContainerId])
+    val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
+    val containers = (1 to totalContainers).map { i => mockId }
+    val count = containers.size / hosts.size / 2
+
+    val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
+    hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
+      val hostContainers = new HashSet[ContainerId]()
+      containers.drop(count * i).take(i).foreach { c => hostContainers += c }
+      hostToContainerMap(host) = hostContainers
+    }
+
+    strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
+      hostToContainerMap, Nil)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org