You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/28 00:14:37 UTC

spark git commit: [SPARK-11212][CORE][STREAMING] Make preferred locations support ExecutorCacheTaskLocation and update…

Repository: spark
Updated Branches:
  refs/heads/master 4f030b9e8 -> 9fbd75ab5


[SPARK-11212][CORE][STREAMING] Make preferred locations support ExecutorCacheTaskLocation and update…

… ReceiverTracker and ReceiverSchedulingPolicy to use it

This PR includes the following changes:

1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD.
2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host.

The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver.

So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this.

Author: zsxwing <zs...@gmail.com>

Closes #9181 from zsxwing/executor-location.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fbd75ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fbd75ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fbd75ab

Branch: refs/heads/master
Commit: 9fbd75ab5d46612e52116ec5b9ced70715cf26b5
Parents: 4f030b9
Author: zsxwing <zs...@gmail.com>
Authored: Tue Oct 27 16:14:33 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Oct 27 16:14:33 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskLocation.scala   |  17 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   |   1 +
 .../receiver/ReceiverSupervisorImpl.scala       |   5 +-
 .../scheduler/ReceiverSchedulingPolicy.scala    | 110 +++++++++++--------
 .../streaming/scheduler/ReceiverTracker.scala   |  93 +++++++++-------
 .../scheduler/ReceiverTrackingInfo.scala        |   9 +-
 .../ReceiverSchedulingPolicySuite.scala         | 110 ++++++++++++-------
 .../scheduler/ReceiverTrackerSuite.scala        |   4 +-
 8 files changed, 217 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index 1b65926..1eb6c16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation {
  */
 private [spark]
 case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
-  extends TaskLocation
+  extends TaskLocation {
+  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
+}
 
 /**
  * A location on a host.
@@ -53,6 +55,9 @@ private[spark] object TaskLocation {
   // confusion.  See  RFC 952 and RFC 1123 for information about the format of hostnames.
   val inMemoryLocationTag = "hdfs_cache_"
 
+  // Identify locations of executors with this prefix.
+  val executorLocationTag = "executor_"
+
   def apply(host: String, executorId: String): TaskLocation = {
     new ExecutorCacheTaskLocation(host, executorId)
   }
@@ -65,7 +70,15 @@ private[spark] object TaskLocation {
   def apply(str: String): TaskLocation = {
     val hstr = str.stripPrefix(inMemoryLocationTag)
     if (hstr.equals(str)) {
-      new HostTaskLocation(str)
+      if (str.startsWith(executorLocationTag)) {
+        val splits = str.split("_")
+        if (splits.length != 3) {
+          throw new IllegalArgumentException("Illegal executor location format: " + str)
+        }
+        new ExecutorCacheTaskLocation(splits(1), splits(2))
+      } else {
+        new HostTaskLocation(str)
+      }
     } else {
       new HDFSCacheTaskLocation(hstr)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 695523c..cd6bf72 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
   test("Test TaskLocation for different host type.") {
     assert(TaskLocation("host1") === HostTaskLocation("host1"))
     assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
+    assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
   }
 
   def createTaskResult(id: Int): DirectTaskResult[Int] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 59ef58d..167f56a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl(
     checkpointDirOption: Option[String]
   ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
 
-  private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
+  private val host = SparkEnv.get.blockManager.blockManagerId.host
+  private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
 
   private val receivedBlockHandler: ReceivedBlockHandler = {
     if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
@@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl(
 
   override protected def onReceiverStart(): Boolean = {
     val msg = RegisterReceiver(
-      streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
+      streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
     trackerEndpoint.askWithRetry[Boolean](msg)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index d2b0be7..234bc86 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.Map
 import scala.collection.mutable
 
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
 
 /**
  * A class that tries to schedule receivers with evenly distributed. There are two phases for
@@ -29,23 +29,23 @@ import org.apache.spark.util.Utils
  *
  * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
  *   all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
- *   It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
- *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
- *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
- *   contains the scheduled locations. Then when a receiver is starting, it will send a register
- *   request and `ReceiverTracker.registerReceiver` will be called. In
- *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
- *   if the location of this receiver is one of the scheduled executors, if not, the register will
+ *   It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
+ *   update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
+ *   that contains the scheduled locations. Then when a receiver is starting, it will send a
+ *   register request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
+ *   if the location of this receiver is one of the scheduled locations, if not, the register will
  *   be rejected.
  * - The second phase is local scheduling when a receiver is restarting. There are two cases of
  *   receiver restarting:
  *   - If a receiver is restarting because it's rejected due to the real location and the scheduled
- *     executors mismatching, in other words, it fails to start in one of the locations that
+ *     locations mismatching, in other words, it fails to start in one of the locations that
  *     `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
- *     still alive in the list of scheduled executors, then use them to launch the receiver job.
- *   - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ *     still alive in the list of scheduled locations, then use them to launch the receiver job.
+ *   - If a receiver is restarting without a scheduled locations list, or the executors in the list
  *     are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
- *     not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ *     not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
  *     it. Then when this receiver is registering, we can know this is a local scheduling, and
  *     `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
  *     location is matching.
@@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy {
    * </ol>
    *
    * This method is called when we start to launch receivers at the first time.
+   *
+   * @return a map for receivers and their scheduled locations
    */
   def scheduleReceivers(
-      receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
+      receivers: Seq[Receiver[_]],
+      executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
     if (receivers.isEmpty) {
       return Map.empty
     }
@@ -80,16 +83,16 @@ private[streaming] class ReceiverSchedulingPolicy {
       return receivers.map(_.streamId -> Seq.empty).toMap
     }
 
-    val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
-    val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val hostToExecutors = executors.groupBy(_.host)
+    val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
+    val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
     // Set the initial value to 0
     executors.foreach(e => numReceiversOnExecutor(e) = 0)
 
     // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
     // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
     for (i <- 0 until receivers.length) {
-      // Note: preferredLocation is host but executors are host:port
+      // Note: preferredLocation is host but executors are host_executorId
       receivers(i).preferredLocation.foreach { host =>
         hostToExecutors.get(host) match {
           case Some(executorsOnHost) =>
@@ -97,7 +100,7 @@ private[streaming] class ReceiverSchedulingPolicy {
             // this host
             val leastScheduledExecutor =
               executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
-            scheduledExecutors(i) += leastScheduledExecutor
+            scheduledLocations(i) += leastScheduledExecutor
             numReceiversOnExecutor(leastScheduledExecutor) =
               numReceiversOnExecutor(leastScheduledExecutor) + 1
           case None =>
@@ -106,17 +109,20 @@ private[streaming] class ReceiverSchedulingPolicy {
             // 1. This executor is not up. But it may be up later.
             // 2. This executor is dead, or it's not a host in the cluster.
             // Currently, simply add host to the scheduled executors.
-            scheduledExecutors(i) += host
+
+            // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
+            // this case
+            scheduledLocations(i) += TaskLocation(host)
         }
       }
     }
 
     // For those receivers that don't have preferredLocation, make sure we assign at least one
     // executor to them.
-    for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
+    for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
       // Select the executor that has the least receivers
       val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
-      scheduledExecutorsForOneReceiver += leastScheduledExecutor
+      scheduledLocationsForOneReceiver += leastScheduledExecutor
       numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
     }
 
@@ -124,22 +130,22 @@ private[streaming] class ReceiverSchedulingPolicy {
     val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
     for (executor <- idleExecutors) {
       // Assign an idle executor to the receiver that has least candidate executors.
-      val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
+      val leastScheduledExecutors = scheduledLocations.minBy(_.size)
       leastScheduledExecutors += executor
     }
 
-    receivers.map(_.streamId).zip(scheduledExecutors).toMap
+    receivers.map(_.streamId).zip(scheduledLocations).toMap
   }
 
   /**
-   * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
+   * Return a list of candidate locations to run the receiver. If the list is empty, the caller can
    * run this receiver in arbitrary executor.
    *
    * This method tries to balance executors' load. Here is the approach to schedule executors
    * for a receiver.
    * <ol>
    *   <li>
-   *     If preferredLocation is set, preferredLocation should be one of the candidate executors.
+   *     If preferredLocation is set, preferredLocation should be one of the candidate locations.
    *   </li>
    *   <li>
    *     Every executor will be assigned to a weight according to the receivers running or
@@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy {
       receiverId: Int,
       preferredLocation: Option[String],
       receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
-      executors: Seq[String]): Seq[String] = {
+      executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = {
     if (executors.isEmpty) {
       return Seq.empty
     }
 
     // Always try to schedule to the preferred locations
-    val scheduledExecutors = mutable.Set[String]()
-    scheduledExecutors ++= preferredLocation
-
-    val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
-      receiverTrackingInfo.state match {
-        case ReceiverState.INACTIVE => Nil
-        case ReceiverState.SCHEDULED =>
-          val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
-          // The probability that a scheduled receiver will run in an executor is
-          // 1.0 / scheduledLocations.size
-          scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
-        case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
-      }
-    }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+    val scheduledLocations = mutable.Set[TaskLocation]()
+    // Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to
+    // handle this case
+    scheduledLocations ++= preferredLocation.map(TaskLocation(_))
+
+    val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
+      receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
+        .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+    }
 
     val idleExecutors = executors.toSet -- executorWeights.keys
     if (idleExecutors.nonEmpty) {
-      scheduledExecutors ++= idleExecutors
+      scheduledLocations ++= idleExecutors
     } else {
       // There is no idle executor. So select all executors that have the minimum weight.
       val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
       if (sortedExecutors.nonEmpty) {
         val minWeight = sortedExecutors(0)._2
-        scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+        scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
       } else {
         // This should not happen since "executors" is not empty
       }
     }
-    scheduledExecutors.toSeq
+    scheduledLocations.toSeq
+  }
+
+  /**
+   * This method tries to convert a receiver tracking info to executor weights. Every executor will
+   * be assigned to a weight according to the receivers running or scheduling on it:
+   *
+   * - If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
+   * - If a receiver is scheduled to an executor but has not yet run, it contributes
+   * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.
+   */
+  private def convertReceiverTrackingInfoToExecutorWeights(
+      receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = {
+    receiverTrackingInfo.state match {
+      case ReceiverState.INACTIVE => Nil
+      case ReceiverState.SCHEDULED =>
+        val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
+        // The probability that a scheduled receiver will run in an executor is
+        // 1.0 / scheduledLocations.size
+        scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location =>
+          location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size)
+        }
+      case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 2ce80d6..b183d85 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.ExecutionContext
 import scala.language.existentials
 import scala.util.{Failure, Success}
 
-import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{TaskLocation, ExecutorCacheTaskLocation}
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}
 
 
 /** Enumeration to identify current state of a Receiver */
@@ -47,7 +48,8 @@ private[streaming] sealed trait ReceiverTrackerMessage
 private[streaming] case class RegisterReceiver(
     streamId: Int,
     typ: String,
-    hostPort: String,
+    host: String,
+    executorId: String,
     receiverEndpoint: RpcEndpointRef
   ) extends ReceiverTrackerMessage
 private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
@@ -235,7 +237,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   private def registerReceiver(
       streamId: Int,
       typ: String,
-      hostPort: String,
+      host: String,
+      executorId: String,
       receiverEndpoint: RpcEndpointRef,
       senderAddress: RpcAddress
     ): Boolean = {
@@ -247,18 +250,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       return false
     }
 
-    val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
-    val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+    val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
+    val acceptableExecutors = if (scheduledLocations.nonEmpty) {
         // This receiver is registering and it's scheduled by
-        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
-        scheduledExecutors.get
+        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
+        scheduledLocations.get
       } else {
         // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
         // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
         scheduleReceiver(streamId)
       }
 
-    if (!accetableExecutors.contains(hostPort)) {
+    def isAcceptable: Boolean = acceptableExecutors.exists {
+      case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
+      case loc: TaskLocation => loc.host == host
+    }
+
+    if (!isAcceptable) {
       // Refuse it since it's scheduled to a wrong executor
       false
     } else {
@@ -266,8 +274,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       val receiverTrackingInfo = ReceiverTrackingInfo(
         streamId,
         ReceiverState.ACTIVE,
-        scheduledExecutors = None,
-        runningExecutor = Some(hostPort),
+        scheduledLocations = None,
+        runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
         name = Some(name),
         endpoint = Some(receiverEndpoint))
       receiverTrackingInfos.put(streamId, receiverTrackingInfo)
@@ -338,25 +346,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
   }
 
-  private def scheduleReceiver(receiverId: Int): Seq[String] = {
+  private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = {
     val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
-    val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
+    val scheduledLocations = schedulingPolicy.rescheduleReceiver(
       receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
-    updateReceiverScheduledExecutors(receiverId, scheduledExecutors)
-    scheduledExecutors
+    updateReceiverScheduledExecutors(receiverId, scheduledLocations)
+    scheduledLocations
   }
 
   private def updateReceiverScheduledExecutors(
-      receiverId: Int, scheduledExecutors: Seq[String]): Unit = {
+      receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
     val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
       case Some(oldInfo) =>
         oldInfo.copy(state = ReceiverState.SCHEDULED,
-          scheduledExecutors = Some(scheduledExecutors))
+          scheduledLocations = Some(scheduledLocations))
       case None =>
         ReceiverTrackingInfo(
           receiverId,
           ReceiverState.SCHEDULED,
-          Some(scheduledExecutors),
+          Some(scheduledLocations),
           runningExecutor = None)
     }
     receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
@@ -370,13 +378,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   /**
    * Get the list of executors excluding driver
    */
-  private def getExecutors: Seq[String] = {
+  private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
     if (ssc.sc.isLocal) {
-      Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort)
+      val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
+      Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
     } else {
       ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
         blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
-      }.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq
+      }.map { case (blockManagerId, _) =>
+        ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
+      }.toSeq
     }
   }
 
@@ -431,9 +442,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     override def receive: PartialFunction[Any, Unit] = {
       // Local messages
       case StartAllReceivers(receivers) =>
-        val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
+        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
         for (receiver <- receivers) {
-          val executors = scheduledExecutors(receiver.streamId)
+          val executors = scheduledLocations(receiver.streamId)
           updateReceiverScheduledExecutors(receiver.streamId, executors)
           receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
           startReceiver(receiver, executors)
@@ -441,14 +452,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       case RestartReceiver(receiver) =>
         // Old scheduled executors minus the ones that are not active any more
         val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
-        val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+        val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
             // Try global scheduling again
             oldScheduledExecutors
           } else {
             val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
-            // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+            // Clear "scheduledLocations" to indicate we are going to do local scheduling
             val newReceiverInfo = oldReceiverInfo.copy(
-              state = ReceiverState.INACTIVE, scheduledExecutors = None)
+              state = ReceiverState.INACTIVE, scheduledLocations = None)
             receiverTrackingInfos(receiver.streamId) = newReceiverInfo
             schedulingPolicy.rescheduleReceiver(
               receiver.streamId,
@@ -458,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           }
         // Assume there is one receiver restarting at one time, so we don't need to update
         // receiverTrackingInfos
-        startReceiver(receiver, scheduledExecutors)
+        startReceiver(receiver, scheduledLocations)
       case c: CleanupOldBlocks =>
         receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
       case UpdateReceiverRateLimit(streamUID, newRate) =>
@@ -472,9 +483,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       // Remote messages
-      case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
+      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
         val successful =
-          registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress)
+          registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
         context.reply(successful)
       case AddBlock(receivedBlockInfo) =>
         context.reply(addBlock(receivedBlockInfo))
@@ -493,13 +504,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     /**
      * Return the stored scheduled executors that are still alive.
      */
-    private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+    private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
       if (receiverTrackingInfos.contains(receiverId)) {
-        val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
-        if (scheduledExecutors.nonEmpty) {
+        val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
+        if (scheduledLocations.nonEmpty) {
           val executors = getExecutors.toSet
           // Only return the alive executors
-          scheduledExecutors.get.filter(executors)
+          scheduledLocations.get.filter {
+            case loc: ExecutorCacheTaskLocation => executors(loc)
+            case loc: TaskLocation => true
+          }
         } else {
           Nil
         }
@@ -511,7 +525,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     /**
      * Start a receiver along with its scheduled executors
      */
-    private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
+    private def startReceiver(
+        receiver: Receiver[_],
+        scheduledLocations: Seq[TaskLocation]): Unit = {
       def shouldStartReceiver: Boolean = {
         // It's okay to start when trackerState is Initialized or Started
         !(isTrackerStopping || isTrackerStopped)
@@ -546,13 +562,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           }
         }
 
-      // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
+      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
       val receiverRDD: RDD[Receiver[_]] =
-        if (scheduledExecutors.isEmpty) {
+        if (scheduledLocations.isEmpty) {
           ssc.sc.makeRDD(Seq(receiver), 1)
         } else {
-          val preferredLocations =
-            scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
+          val preferredLocations = scheduledLocations.map(_.toString).distinct
           ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
         }
       receiverRDD.setName(s"Receiver $receiverId")

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index 043ff4d..ab0a84f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
 import org.apache.spark.streaming.scheduler.ReceiverState._
 
 private[streaming] case class ReceiverErrorInfo(
@@ -28,7 +29,7 @@ private[streaming] case class ReceiverErrorInfo(
  *
  * @param receiverId the unique receiver id
  * @param state the current Receiver state
- * @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy
+ * @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy
  * @param runningExecutor the running executor if the receiver is active
  * @param name the receiver name
  * @param endpoint the receiver endpoint. It can be used to send messages to the receiver
@@ -37,8 +38,8 @@ private[streaming] case class ReceiverErrorInfo(
 private[streaming] case class ReceiverTrackingInfo(
     receiverId: Int,
     state: ReceiverState,
-    scheduledExecutors: Option[Seq[String]],
-    runningExecutor: Option[String],
+    scheduledLocations: Option[Seq[TaskLocation]],
+    runningExecutor: Option[ExecutorCacheTaskLocation],
     name: Option[String] = None,
     endpoint: Option[RpcEndpointRef] = None,
     errorInfo: Option[ReceiverErrorInfo] = None) {
@@ -47,7 +48,7 @@ private[streaming] case class ReceiverTrackingInfo(
     receiverId,
     name.getOrElse(""),
     state == ReceiverState.ACTIVE,
-    location = runningExecutor.getOrElse(""),
+    location = runningExecutor.map(_.host).getOrElse(""),
     lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
     lastError = errorInfo.map(_.lastError).getOrElse(""),
     lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index b2a51d7..05b4e66 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation}
 
 class ReceiverSchedulingPolicySuite extends SparkFunSuite {
 
   val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
 
   test("rescheduleReceiver: empty executors") {
-    val scheduledExecutors =
+    val scheduledLocations =
       receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
-    assert(scheduledExecutors === Seq.empty)
+    assert(scheduledLocations === Seq.empty)
   }
 
   test("rescheduleReceiver: receiver preferredLocation") {
+    val executors = Seq(ExecutorCacheTaskLocation("host2", "2"))
     val receiverTrackingInfoMap = Map(
       0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
-      0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
-    assert(scheduledExecutors.toSet === Set("host1", "host2"))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
+      0, Some("host1"), receiverTrackingInfoMap, executors)
+    assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0)))
   }
 
   test("rescheduleReceiver: return all idle executors if there are any idle executors") {
-    val executors = Seq("host1", "host2", "host3", "host4", "host5")
-    // host3 is idle
+    val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i"))
+    // executor 1 is busy, others are idle.
     val receiverTrackingInfoMap = Map(
-      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0))))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
       1, None, receiverTrackingInfoMap, executors)
-    assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
+    assert(scheduledLocations.toSet === executors.tail.toSet)
   }
 
   test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
-    val executors = Seq("host1", "host2", "host3", "host4", "host5")
+    val executors = Seq(
+      ExecutorCacheTaskLocation("host1", "1"),
+      ExecutorCacheTaskLocation("host2", "2"),
+      ExecutorCacheTaskLocation("host3", "3"),
+      ExecutorCacheTaskLocation("host4", "4"),
+      ExecutorCacheTaskLocation("host5", "5")
+    )
     // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
     val receiverTrackingInfoMap = Map(
-      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
-      1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
-      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
-      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None,
+        Some(ExecutorCacheTaskLocation("host1", "1"))),
+      1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))),
+        None),
+      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))),
+        None),
+      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host4", "4"),
+          ExecutorCacheTaskLocation("host5", "5"))), None))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
       4, None, receiverTrackingInfoMap, executors)
-    assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
+    val expectedScheduledLocations = Set(
+      ExecutorCacheTaskLocation("host2", "2"),
+      ExecutorCacheTaskLocation("host4", "4"),
+      ExecutorCacheTaskLocation("host5", "5")
+    )
+    assert(scheduledLocations.toSet === expectedScheduledLocations)
   }
 
   test("scheduleReceivers: " +
     "schedule receivers evenly when there are more receivers than executors") {
     val receivers = (0 until 6).map(new RateTestReceiver(_))
-    val executors = (10000 until 10003).map(port => s"localhost:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 3).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 2 receivers running on each executor and each receiver has one executor
-    scheduledExecutors.foreach { case (receiverId, executors) =>
-      assert(executors.size == 1)
-      numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
+    scheduledLocations.foreach { case (receiverId, locations) =>
+      assert(locations.size == 1)
+      assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation])
+      numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
     }
     assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
   }
 
-
   test("scheduleReceivers: " +
     "schedule receivers evenly when there are more executors than receivers") {
     val receivers = (0 until 3).map(new RateTestReceiver(_))
-    val executors = (10000 until 10006).map(port => s"localhost:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 6).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 1 receiver running on each executor and each receiver has two executors
-    scheduledExecutors.foreach { case (receiverId, executors) =>
-      assert(executors.size == 2)
-      executors.foreach { l =>
+    scheduledLocations.foreach { case (receiverId, locations) =>
+      assert(locations.size == 2)
+      locations.foreach { l =>
+        assert(l.isInstanceOf[ExecutorCacheTaskLocation])
         numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
       }
     }
@@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
   test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
     val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
       (3 until 6).map(new RateTestReceiver(_, Some("localhost")))
-    val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
-      (10003 until 10006).map(port => s"localhost2:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 3).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString)) ++
+      (3 until 6).map(executorId =>
+        ExecutorCacheTaskLocation("localhost2", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 1 receiver running on each executor and each receiver has 1 executor
-    scheduledExecutors.foreach { case (receiverId, executors) =>
+    scheduledLocations.foreach { case (receiverId, executors) =>
       assert(executors.size == 1)
       executors.foreach { l =>
+        assert(l.isInstanceOf[ExecutorCacheTaskLocation])
         numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
       }
     }
     assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
     // Make sure we schedule the receivers to their preferredLocations
     val executorsForReceiversWithPreferredLocation =
-      scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
+      scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
     // We can simply check the executor set because we only know each receiver only has 1 executor
     assert(executorsForReceiversWithPreferredLocation.toSet ===
-      (10000 until 10003).map(port => s"localhost:${port}").toSet)
+      (0 until 3).map(executorId =>
+        ExecutorCacheTaskLocation("localhost", executorId.toString)
+      ).toSet)
   }
 
   test("scheduleReceivers: return empty if no receiver") {
-    assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
+    val scheduledLocations = receiverSchedulingPolicy.
+      scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1")))
+    assert(scheduledLocations.isEmpty)
   }
 
   test("scheduleReceivers: return empty scheduled executors if no executors") {
     val receivers = (0 until 3).map(new RateTestReceiver(_))
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
-    scheduledExecutors.foreach { case (receiverId, executors) =>
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
+    scheduledLocations.foreach { case (receiverId, executors) =>
       assert(executors.isEmpty)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fbd75ab/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index fda86ae..3bd8d08 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       output.register()
       ssc.start()
       eventually(timeout(10 seconds), interval(10 millis)) {
-        // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
-        assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+        // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
+        assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org