You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/06/30 20:12:19 UTC

spark git commit: [SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default

Repository: spark
Updated Branches:
  refs/heads/master 9213f73a8 -> ca7e460f7


[SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default

Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling.

tdas pwendell

Author: nishkamravi2 <ni...@gmail.com>
Author: Nishkam Ravi <nr...@cloudera.com>

Closes #6607 from nishkamravi2/master_nravi and squashes the following commits:

1918819 [Nishkam Ravi] Update ReceiverTrackerSuite.scala
f747739 [Nishkam Ravi] Update ReceiverTrackerSuite.scala
6127e58 [Nishkam Ravi] Update ReceiverTracker and ReceiverTrackerSuite
9f1abc2 [nishkamravi2] Update ReceiverTrackerSuite.scala
ae29152 [Nishkam Ravi] Update test suite with TD's suggestions
48a4a97 [nishkamravi2] Update ReceiverTracker.scala
bc23907 [nishkamravi2] Update ReceiverTracker.scala
68e8540 [nishkamravi2] Update SchedulerSuite.scala
4604f28 [nishkamravi2] Update SchedulerSuite.scala
179b90f [nishkamravi2] Update ReceiverTracker.scala
242e677 [nishkamravi2] Update SchedulerSuite.scala
7f3e028 [Nishkam Ravi] Update ReceiverTracker.scala, add unit test cases in SchedulerSuite
f8a3e05 [nishkamravi2] Update ReceiverTracker.scala
4cf97b6 [nishkamravi2] Update ReceiverTracker.scala
16e84ec [Nishkam Ravi] Update ReceiverTracker.scala
45e3a99 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi
02dbdb8 [Nishkam Ravi] Update ReceiverTracker.scala
07b9dfa [nishkamravi2] Update ReceiverTracker.scala
6caeefe [nishkamravi2] Update ReceiverTracker.scala
7888257 [nishkamravi2] Update ReceiverTracker.scala
6e3515c [Nishkam Ravi] Minor changes
975b8d8 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi
3cac21b [Nishkam Ravi] Generalize the scheduling algorithm
b05ee2f [nishkamravi2] Update ReceiverTracker.scala
bb5e09b [Nishkam Ravi] Add a new var in receiver to store location information for round-robin scheduling
41705de [nishkamravi2] Update ReceiverTracker.scala
fff1b2e [Nishkam Ravi] Round-robin scheduling of streaming receivers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca7e460f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca7e460f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca7e460f

Branch: refs/heads/master
Commit: ca7e460f7d6fb898dc29236a85520bbe954c8a13
Parents: 9213f73
Author: nishkamravi2 <ni...@gmail.com>
Authored: Tue Jun 30 11:12:15 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jun 30 11:12:15 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/ReceiverTracker.scala   | 64 +++++++++++---
 .../scheduler/ReceiverTrackerSuite.scala        | 90 ++++++++++++++++++++
 2 files changed, 141 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca7e460f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index e6cdbec..644e581 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.streaming.scheduler
 
-import scala.collection.mutable.{HashMap, SynchronizedMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
 import scala.language.existentials
+import scala.math.max
+import org.apache.spark.rdd._
 
 import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark.{Logging, SparkEnv, SparkException}
@@ -273,6 +275,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     }
 
     /**
+     * Get the list of executors excluding driver
+     */
+    private def getExecutors(ssc: StreamingContext): List[String] = {
+      val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
+      val driver = ssc.sparkContext.getConf.get("spark.driver.host")
+      executors.diff(List(driver))
+    }
+
+    /** Set host location(s) for each receiver so as to distribute them over
+     * executors in a round-robin fashion taking into account preferredLocation if set
+     */
+    private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
+      executors: List[String]): Array[ArrayBuffer[String]] = {
+      val locations = new Array[ArrayBuffer[String]](receivers.length)
+      var i = 0
+      for (i <- 0 until receivers.length) {
+        locations(i) = new ArrayBuffer[String]()
+        if (receivers(i).preferredLocation.isDefined) {
+          locations(i) += receivers(i).preferredLocation.get
+        }
+      }
+      var count = 0
+      for (i <- 0 until max(receivers.length, executors.length)) {
+        if (!receivers(i % receivers.length).preferredLocation.isDefined) {
+          locations(i % receivers.length) += executors(count)
+          count += 1
+          if (count == executors.length) {
+            count = 0
+          }
+        }
+      }
+      locations
+    }
+
+    /**
      * Get the receivers from the ReceiverInputDStreams, distributes them to the
      * worker nodes as a parallel collection, and runs them.
      */
@@ -283,18 +320,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         rcvr
       })
 
-      // Right now, we only honor preferences if all receivers have them
-      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
-
-      // Create the parallel collection of receivers to distributed them on the worker nodes
-      val tempRDD =
-        if (hasLocationPreferences) {
-          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
-          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
-        } else {
-          ssc.sc.makeRDD(receivers, receivers.size)
-        }
-
       val checkpointDirOption = Option(ssc.checkpointDir)
       val serializableHadoopConf =
         new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
@@ -311,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         supervisor.start()
         supervisor.awaitTermination()
       }
+
       // Run the dummy Spark job to ensure that all slaves have registered.
       // This avoids all the receivers to be scheduled on the same node.
       if (!ssc.sparkContext.isLocal) {
         ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
       }
 
+      // Get the list of executors and schedule receivers
+      val executors = getExecutors(ssc)
+      val tempRDD =
+        if (!executors.isEmpty) {
+          val locations = scheduleReceivers(receivers, executors)
+          val roundRobinReceivers = (0 until receivers.length).map(i =>
+            (receivers(i), locations(i)))
+          ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
+        } else {
+          ssc.sc.makeRDD(receivers, receivers.size)
+        }
+
       // Distribute the receivers and start them
       logInfo("Starting " + receivers.length + " receivers")
       running = true

http://git-wip-us.apache.org/repos/asf/spark/blob/ca7e460f/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
new file mode 100644
index 0000000..a6e7838
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming._
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver._
+import org.apache.spark.util.Utils
+
+/** Testsuite for receiver scheduling */
+class ReceiverTrackerSuite extends TestSuiteBase {
+  val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
+  val ssc = new StreamingContext(sparkConf, Milliseconds(100))
+  val tracker = new ReceiverTracker(ssc)
+  val launcher = new tracker.ReceiverLauncher()
+  val executors: List[String] = List("0", "1", "2", "3")
+
+  test("receiver scheduling - all or none have preferred location") {
+
+    def parse(s: String): Array[Array[String]] = {
+      val outerSplit = s.split("\\|")
+      val loc = new Array[Array[String]](outerSplit.length)
+      var i = 0
+      for (i <- 0 until outerSplit.length) {
+        loc(i) = outerSplit(i).split("\\,")
+      }
+      loc
+    }
+
+    def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) {
+      val receivers =
+        if (preferredLocation) {
+          Array.tabulate(numReceivers)(i => new DummyReceiver(host =
+            Some(((i + 1) % executors.length).toString)))
+        } else {
+          Array.tabulate(numReceivers)(_ => new DummyReceiver)
+        }
+      val locations = launcher.scheduleReceivers(receivers, executors)
+      val expectedLocations = parse(allocation)
+      assert(locations.deep === expectedLocations.deep)
+    }
+
+    testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
+    testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2")
+    testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")
+  }
+
+  test("receiver scheduling - some have preferred location") {
+    val numReceivers = 4;
+    val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")),
+      new DummyReceiver, new DummyReceiver, new DummyReceiver)
+    val locations = launcher.scheduleReceivers(receivers, executors)
+    assert(locations(0)(0) === "1")
+    assert(locations(1)(0) === "0")
+    assert(locations(2)(0) === "1")
+    assert(locations(0).length === 1)
+    assert(locations(3).length === 1)
+  }
+}
+
+/**
+ * Dummy receiver implementation
+ */
+private class DummyReceiver(host: Option[String] = None)
+  extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  def onStart() {
+  }
+
+  def onStop() {
+  }
+
+  override def preferredLocation: Option[String] = host
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org