You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tgravescs <gi...@git.apache.org> on 2016/04/01 20:51:19 UTC

[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

GitHub user tgravescs opened a pull request:

    https://github.com/apache/spark/pull/12113

    [SPARK-1239] Improve fetching of map output statuses

    The main issue we are trying to solve is the memory bloat of the Driver when tasks request the map output statuses.  This means with a large number of tasks you either need a huge amount of memory on Driver or you have to repartition to smaller number.  This makes it really difficult to run over say 50000 tasks.
    
    The main issues that cause the memory bloat are:
    1) no flow control on sending the map output status responses.  We serialize the map status output  and then hand off to netty to send.  netty is sending asynchronously and it can't send them fast enough to keep up with incoming requests so we end up with lots of copies of the serialized map output statuses sitting there and this causes huge bloat when you have 10's of thousands of tasks and map output status is in the 10's of MB.
    2) When initial reduce tasks are started up, they all request the map output statuses from the Driver. These requests are handled by multiple threads in parallel so even though we check to see if we have a cached version, initially when we don't have a cached version yet, many of initial requests can all end up serializing the exact same map output statuses.
    
    This patch does a couple of things:
    - When the map output status size is over a threshold (default 512K) then it uses broadcast to send the map statuses.  This means we no longer serialize a large map output status and thus we don't have issues with memory bloat.  the messages sizes are now in the 300-400 byte range and the map status output are broadcast. If its under the threadshold it sends it as before, the message contains the DIRECT indicator now.
    - synchronize the incoming requests to allow one thread to cache the serialized output and broadcast the map output status  that can then be used by everyone else.  This ensures we don't create multiple broadcast variables when we don't need to.  To ensure this happens I added a second thread pool which the Dispatcher hands the requests to so that those threads can block without blocking the main dispatcher threads (which would cause things like heartbeats and such not to come through)
    
    Note that some of design and code was contributed by @mridulm
    
    ## How was this patch tested?
    
    Unit tests and a lot of manually testing. 
    Ran with akka and netty rpc. Ran with both dynamic allocation on and off.
    
    one of the large jobs I used to test this was a join of 15TB of data.  it had 200,000 map tasks, and  20,000 reduce tasks. Executors ranged from 200 to 2000.  This job ran successfully with 5GB of memory on the driver with these changes. Without these changes I was using 20GB and only had 500 reduce tasks.  The job has 50mb of serialized map output statuses and took roughly the same amount of time for the executors to get the map output statuses as before.
    
    Ran a variety of other jobs, from large wordcounts to small ones not using broadcasts. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tgravescs/spark SPARK-1239

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12113
    
----
commit 8e4f2efea08b7013a2702543dc3860f9d277e3ac
Author: Thomas Graves <tg...@staydecay.corp.gq1.yahoo.com>
Date:   2016-04-01T17:44:44Z

    [SPARK-1239] Don't fetch all map output statuses at each reducer during shuffles

commit 3c1def02b80ebfed55904e504609aa02de6559ce
Author: Thomas Graves <tg...@staydecay.corp.gq1.yahoo.com>
Date:   2016-04-01T18:49:49Z

    Update unit test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58698221
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends Logging {
         } {
           objOut.close()
         }
    -    out.toByteArray
    +    val arr = out.toByteArray
    +    if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) {
    +      // Use broadcast instead.
    +      // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
    +      val bcast = broadcastManager.newBroadcast(arr, isLocal)
    +      // toByteArray creates copy, so we can reuse out
    +      out.reset()
    +      out.write(BROADCAST)
    +      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
    +      oos.writeObject(bcast)
    +      oos.close()
    +      val outArr = out.toByteArray
    +      logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
    +      (outArr, bcast)
    +    } else {
    +      (arr, null)
    +    }
       }
     
       // Opposite of serializeMapStatuses.
       def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
    -    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
    -    Utils.tryWithSafeFinally {
    -      objIn.readObject().asInstanceOf[Array[MapStatus]]
    -    } {
    -      objIn.close()
    +    assert (bytes.length > 0)
    +
    +    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
    +      val objIn = new ObjectInputStream(new GZIPInputStream(
    +        new ByteArrayInputStream(arr, off, len)))
    +      Utils.tryWithSafeFinally {
    +        objIn.readObject()
    +      } {
    +        objIn.close()
    +      }
    +    }
    +
    +    bytes(0) match {
    +      case DIRECT =>
    +        deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
    +      case BROADCAST =>
    +        // deserialize the Broadcast, pull .value array out of it, and then deserialize that
    +        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
    +          asInstanceOf[Broadcast[Array[Byte]]]
    +        logInfo("Broadcast mapstatuses size = " + bytes.length +
    +          ", actual size = " + bcast.value.length)
    +        // Important - ignore the DIRECT tag ! Start from offset 1
    +        deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
    --- End diff --
    
    sorry not sure I follow the question?
    
     The deserialize is getting the small message, which will indicate its a broadcast and then each executor will get the broadcast variable just like any other broadcast variable (done by the user).  Multiple executors can fetch at the same time and it uses the torrent broadcast which will share the load as other executors get parts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-206618621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/55152/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62112043
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    --- End diff --
    
    When will this happen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58536056
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    --- End diff --
    
    Are you saying 7 additional seconds on top of the serialization?   
    
    if so you can try increasing the number of threads or obviously increasing the timeout.  What is your timeout set at?  Generally I've found that any job that large I have to increase the timeouts on it anyway. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-211389760
  
    ping @rxin @JoshRosen  Any change you guys could review?  This has been issue for long time with large jobs it would be nice to get it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-216233599
  
    ping @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62259367
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,86 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  // Make sure that that we aren't going to exceed the max RPC message size by making sure
    +  // we use broadcast to send large map output statuses.
    +  if (minSizeForBroadcast > maxRpcMessageSize) {
    +    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
    +      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " +
    +      "message that is to large."
    +    logError(msg)
    +    throw new IllegalArgumentException(msg)
    --- End diff --
    
    Should we use maxRpcMessageSize  for minSizeForBroadcast, and show an warning instead of an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204608543
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54719/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58379059
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    +
    +      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    +      // out a snapshot of the locations as "statuses"; let's serialize and return that
    +      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    +        isLocal, minSizeForBroadcast)
    +      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    +      // Add them into the table only if the epoch hasn't changed while we were working
    +      epochLock.synchronized {
    +        if (epoch == epochGotten) {
    +          cachedSerializedStatuses(shuffleId) = bytes
    +          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    +        } else {
    +          logInfo("Epoch changed, not caching!")
    +          removeBroadcast(bcast)
    +        }
           }
    +      bytes
         }
    -    bytes
    +    newbytes
       }
     
       override def stop() {
    +    mapOutputRequests.offer(PoisonPill)
    +    threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
         mapStatuses.clear()
         trackerEndpoint = null
         cachedSerializedStatuses.clear()
    +    clearCachedBroadcast()
    --- End diff --
    
    Yes,  I noticed this, but this is a general issue with broadcast.  You can cause this same issue by user broadcasting the data.    I was planning on filing a separate jira to look at this, but its really just a timing issue with shutdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217046503
  
    FYI #11886 seems related


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58252476
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -477,12 +605,16 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     private[spark] object MapOutputTracker extends Logging {
     
       val ENDPOINT_NAME = "MapOutputTracker"
    +  private val DIRECT = 0
    +  private val BROADCAST = 1
     
       // Serialize an array of map output locations into an efficient byte format so that we can send
       // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
       // generally be pretty compressible because many map outputs will be on the same hostname.
    -  def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
    +  def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
    +      isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
    --- End diff --
    
    We should probably make this an Either ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217651763
  
    Thanks @davies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62116124
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    --- End diff --
    
    In this case, most of shuffles will be affected, make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58254590
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -477,12 +605,16 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     private[spark] object MapOutputTracker extends Logging {
     
       val ENDPOINT_NAME = "MapOutputTracker"
    +  private val DIRECT = 0
    +  private val BROADCAST = 1
     
       // Serialize an array of map output locations into an efficient byte format so that we can send
       // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
       // generally be pretty compressible because many map outputs will be on the same hostname.
    -  def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
    +  def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
    +      isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
    --- End diff --
    
    Good point, I missed that :-)
    
    On Friday, April 1, 2016, Tom Graves <no...@github.com> wrote:
    
    > In core/src/main/scala/org/apache/spark/MapOutputTracker.scala
    > <https://github.com/apache/spark/pull/12113#discussion_r58253255>:
    >
    > >
    > >    // Serialize an array of map output locations into an efficient byte format so that we can send
    > >    // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
    > >    // generally be pretty compressible because many map outputs will be on the same hostname.
    > > -  def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
    > > +  def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
    > > +      isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
    >
    > Both values are used in getSerializedMapOutputStatuses
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/12113/files/3c1def02b80ebfed55904e504609aa02de6559ce#r58253255>
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217560892
  
    **[Test build #58021 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58021/consoleFull)** for PR 12113 at commit [`396632a`](https://github.com/apache/spark/commit/396632a030e2fc5618e5946e735b9fecacd2bda1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62115577
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    --- End diff --
    
    If anything changes with map output. For instances lose an executor or you get enough fetch failures for a map output.  The map output status' in cache are no longer valid then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217533562
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58536751
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    +
    +      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    +      // out a snapshot of the locations as "statuses"; let's serialize and return that
    +      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    +        isLocal, minSizeForBroadcast)
    +      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    +      // Add them into the table only if the epoch hasn't changed while we were working
    +      epochLock.synchronized {
    +        if (epoch == epochGotten) {
    +          cachedSerializedStatuses(shuffleId) = bytes
    +          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    +        } else {
    +          logInfo("Epoch changed, not caching!")
    +          removeBroadcast(bcast)
    +        }
           }
    +      bytes
         }
    -    bytes
    +    newbytes
       }
     
       override def stop() {
    +    mapOutputRequests.offer(PoisonPill)
    +    threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
         mapStatuses.clear()
         trackerEndpoint = null
         cachedSerializedStatuses.clear()
    +    clearCachedBroadcast()
    --- End diff --
    
    https://issues.apache.org/jira/browse/SPARK-14405


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-207111462
  
    Looks good to me; would be nice to get feedback from @rxin or @JoshRosen also ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58641966
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    --- End diff --
    
    > Are you saying 7 additional seconds on top of the serialization?
    
    I mean, calling `getSerializedMapOutputStatuses` takes seven seconds ,but this is not accurate, is estimated with the production logs.
    
    
    > What is your timeout set at?
    
    By default, no change.
    
    > How long is it taking for the entire serialization (even without this change)?
    
    The master branch throws
    ```
    org.apache.spark.shuffle.FetchFailedException: java.lang.OutOfMemoryError: Direct buffer memory
    ``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62111647
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    We add a lock in registerShuffle(), when will this happen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58950637
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends Logging {
         } {
           objOut.close()
         }
    -    out.toByteArray
    +    val arr = out.toByteArray
    +    if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) {
    +      // Use broadcast instead.
    +      // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
    +      val bcast = broadcastManager.newBroadcast(arr, isLocal)
    +      // toByteArray creates copy, so we can reuse out
    +      out.reset()
    +      out.write(BROADCAST)
    +      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
    +      oos.writeObject(bcast)
    +      oos.close()
    +      val outArr = out.toByteArray
    +      logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
    +      (outArr, bcast)
    +    } else {
    +      (arr, null)
    +    }
       }
     
       // Opposite of serializeMapStatuses.
       def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
    -    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
    -    Utils.tryWithSafeFinally {
    -      objIn.readObject().asInstanceOf[Array[MapStatus]]
    -    } {
    -      objIn.close()
    +    assert (bytes.length > 0)
    +
    +    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
    +      val objIn = new ObjectInputStream(new GZIPInputStream(
    +        new ByteArrayInputStream(arr, off, len)))
    +      Utils.tryWithSafeFinally {
    +        objIn.readObject()
    +      } {
    +        objIn.close()
    +      }
    +    }
    +
    +    bytes(0) match {
    +      case DIRECT =>
    +        deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
    +      case BROADCAST =>
    +        // deserialize the Broadcast, pull .value array out of it, and then deserialize that
    +        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
    +          asInstanceOf[Broadcast[Array[Byte]]]
    +        logInfo("Broadcast mapstatuses size = " + bytes.length +
    +          ", actual size = " + bcast.value.length)
    +        // Important - ignore the DIRECT tag ! Start from offset 1
    +        deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
    --- End diff --
    
    I am not sure I follow - are you concerns about how broadcast itself works ? Or is it specific to this change ?
    Essentially @tgravescs is leveraging broadcast mechanism to send the 'larger' statuses - broadcast follows a different codepath to pull the data to executors which alleviates load at master while pulling data (chunking, pull from other executors if available, etc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204608542
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62110398
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    This is SPARK-1239, what should we do for this comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-207123789
  
    **[Test build #55244 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55244/consoleFull)** for PR 12113 at commit [`4d46b96`](https://github.com/apache/spark/commit/4d46b9607e0ea666abd01b32acd9a244eacd1570).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58252199
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    --- End diff --
    
    We can pull this block out if it is getting repeated multiple times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-207124014
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62129859
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    @davies @tgravescs If we are to remove existing functionality, then we need to add a check when initializing minSizeForBroadcast to ensure it is <= maxRpcMessageSize.
    In general, I dont like coupling unrelated functionalities - though here it looks more logical to do so (alternative is failure at runtime !).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62327844
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,86 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  // Make sure that that we aren't going to exceed the max RPC message size by making sure
    +  // we use broadcast to send large map output statuses.
    +  if (minSizeForBroadcast > maxRpcMessageSize) {
    +    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
    +      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " +
    +      "message that is to large."
    +    logError(msg)
    +    throw new IllegalArgumentException(msg)
    --- End diff --
    
    Personally I preferring erroring out in this case.  It fails very quickly so you won't lose any work  and if the user sets an undocumented config wrong I think we should tell them so they can fix and don't get unexpected behavior.   Now its possible they set spark.rpc.message.maxSize really small which I think would be a bad thing also so again erroring feels like the better thing here.
    
    If there are other reasons we should just warn let me know


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-214739972
  
    @rxin did you have a chance to take a look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217535862
  
    **[Test build #58021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58021/consoleFull)** for PR 12113 at commit [`396632a`](https://github.com/apache/spark/commit/396632a030e2fc5618e5946e735b9fecacd2bda1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62119690
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    we could but one of the reasons I like the broadcast size being configurable is to allow for a fallback in case something went wrong with broadcasts or someone just didn't want to use them. you could configure broadcast min size very large and go back to previous behavior.  we've been running this for quite some time now with no issues but I'm sure what we run doesn't cover all uses cases out there. 
    
    I guess we could still allow that but you then have to set min broadcast size to something like maxSize -1. So I guess I'm fine with either, would you like me to change it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62115019
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    I assume the comment was with a different implementation of SPARK-1239.  Generally it wouldn't be a problem since we broadcast anything that large and the messages are tiny.  If someone configures it such that it doesn't use broadcast or configures the maxSize really small then you could hit this.  
    
    So I'll update the comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62117794
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    as comment says its just being paranoid just in case something weird/unexpected happens, that shouldn't be the norm since the register should happen before anyone tries to fetch.   A couple of lines of code that hardly ever/never get called seems much cheaper then adding synchronize all the time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62332230
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    Its purely defensive programming to allow things to work when the unexpected happen.  Would you rather have your production job that was running for 5 hours throw a null pointer exception or try to fix itself and continue to run?
    In distributed systems weird things happen and this is processing a message from another host/task which you don't have direct control of.  You can get network breaks, weird host failures or pauses, etc and a message comes in late asking for a shuffle id that isn't there anymore.  
    The unregister shuffle which removes the lock for the shuffle id is being called from the context cleaner.  So if an RDD goes out of scope and is cleaned up the shuffle lock gets removed. As I mention above if some host was slightly out of sync and sent a message to fetch that id late, we would throw a null pointer exception.  Everything else in the GetMapOutputStatuses handle this case and there is actually a test for this (fetching after unregister)  so if this line is removed that test fails.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217561095
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58021/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217521502
  
    right you can still run out of memory if under the broadcast limit.  The only way to prevent that would be really have flow control in place, which I originally had some (see jira) but the problem comes down to throttling vs sending fast.  Netty unfortunately doesn't really support that either so you have to do it on your own.   Eventually we could look at just getting rid of the non-broadcast method but I would want to do some more performance testing on that. The threadpool in the non-broadcast case just allows synchronizing so multiple threads aren't serializing the same thing in parallel.
    
    yes if you read the jira you can see my reasoning and further things I considered.  Personally I think that limits you for future enhancements.  For instance being able start reduce tasks before all map tasks finish.   Being able to refetch map statuses without restarting entire tasks. If we weren't doing broadcast you could do more chunking.  You also end up with a very large task data. I didn't spend a lot of time looking at that path but I think you could have the same issue there.  When we launch a bunch of tasks we just have a for loop sending them. So if we are shoving the map statuses into the task info which is being sent over netty and we can't send them fast enough you get the memory bloat again. 
    
    If we still have problems with this then we could look at doing that or something else, but at this point this has been working very well for us (if you read the description you can see my results).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204608350
  
    **[Test build #54719 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54719/consoleFull)** for PR 12113 at commit [`ab17d52`](https://github.com/apache/spark/commit/ab17d52e9c8e246d555ac03e4c60c03e3ed54820).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-211721515
  
    Looking at this now ... (might take a while though because i'd need to page in all the details)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217251327
  
    **[Test build #57912 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57912/consoleFull)** for PR 12113 at commit [`0382155`](https://github.com/apache/spark/commit/0382155ef73437e490abdf83af2b232adbab0eb6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204561374
  
    **[Test build #54712 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54712/consoleFull)** for PR 12113 at commit [`3c1def0`](https://github.com/apache/spark/commit/3c1def02b80ebfed55904e504609aa02de6559ce).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58267242
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    --- End diff --
    
    yeah good idea, I had started to but since it returns inside it makes it a bit more complex and I forgot to get back to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62125338
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    The previous behavior is failure, it does not make sense to follow it, please change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58325719
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    --- End diff --
    
    Here is time-consuming and may cause a timeout.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217283775
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204569422
  
    **[Test build #54719 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54719/consoleFull)** for PR 12113 at commit [`ab17d52`](https://github.com/apache/spark/commit/ab17d52e9c8e246d555ac03e4c60c03e3ed54820).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217283778
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57912/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62184186
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    @davies, either case is failure, it just whether you fail at run time or you fail on submit by having the upfront check of the configs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-206568680
  
    **[Test build #55152 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55152/consoleFull)** for PR 12113 at commit [`fcef95b`](https://github.com/apache/spark/commit/fcef95be071dac058511616d9db2ecedea75348a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204561549
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-207124019
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/55244/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58253255
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -477,12 +605,16 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     private[spark] object MapOutputTracker extends Logging {
     
       val ENDPOINT_NAME = "MapOutputTracker"
    +  private val DIRECT = 0
    +  private val BROADCAST = 1
     
       // Serialize an array of map output locations into an efficient byte format so that we can send
       // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
       // generally be pretty compressible because many map outputs will be on the same hostname.
    -  def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
    +  def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
    +      isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
    --- End diff --
    
    Both values are used in getSerializedMapOutputStatuses


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217528211
  
    yeah I would mostly be worried about hitting the same issue with memory bloat but it would need more investigation.  I'll update for the getSizeAs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217509314
  
    LGTM, will merge this today if no more comments from other people.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217152751
  
    @andrewor14  I commented on that PR.  it is not needed if this PR goes in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217561092
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217533566
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58018/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217523737
  
    Yeah, I didn't mean to suggest the alternative approach to block this; I think this is fine as an improvement. I was just wondering if it's worth it to consider embedding the shuffle location data in tasks as a future enhancement, or if it has fundamental problems that make it a non-starter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58503061
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    +
    +      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    +      // out a snapshot of the locations as "statuses"; let's serialize and return that
    +      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    +        isLocal, minSizeForBroadcast)
    +      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    +      // Add them into the table only if the epoch hasn't changed while we were working
    +      epochLock.synchronized {
    +        if (epoch == epochGotten) {
    +          cachedSerializedStatuses(shuffleId) = bytes
    +          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    +        } else {
    +          logInfo("Epoch changed, not caching!")
    +          removeBroadcast(bcast)
    +        }
           }
    +      bytes
         }
    -    bytes
    +    newbytes
       }
     
       override def stop() {
    +    mapOutputRequests.offer(PoisonPill)
    +    threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
         mapStatuses.clear()
         trackerEndpoint = null
         cachedSerializedStatuses.clear()
    +    clearCachedBroadcast()
    --- End diff --
    
    In  `SparkContext.stop`,`ListenerBus.stop` is earlier than `SparkEnv.stop` is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58711271
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends Logging {
         } {
           objOut.close()
         }
    -    out.toByteArray
    +    val arr = out.toByteArray
    +    if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) {
    +      // Use broadcast instead.
    +      // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
    +      val bcast = broadcastManager.newBroadcast(arr, isLocal)
    +      // toByteArray creates copy, so we can reuse out
    +      out.reset()
    +      out.write(BROADCAST)
    +      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
    +      oos.writeObject(bcast)
    +      oos.close()
    +      val outArr = out.toByteArray
    +      logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
    +      (outArr, bcast)
    +    } else {
    +      (arr, null)
    +    }
       }
     
       // Opposite of serializeMapStatuses.
       def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
    -    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
    -    Utils.tryWithSafeFinally {
    -      objIn.readObject().asInstanceOf[Array[MapStatus]]
    -    } {
    -      objIn.close()
    +    assert (bytes.length > 0)
    +
    +    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
    +      val objIn = new ObjectInputStream(new GZIPInputStream(
    +        new ByteArrayInputStream(arr, off, len)))
    +      Utils.tryWithSafeFinally {
    +        objIn.readObject()
    +      } {
    +        objIn.close()
    +      }
    +    }
    +
    +    bytes(0) match {
    +      case DIRECT =>
    +        deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
    +      case BROADCAST =>
    +        // deserialize the Broadcast, pull .value array out of it, and then deserialize that
    +        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
    +          asInstanceOf[Broadcast[Array[Byte]]]
    +        logInfo("Broadcast mapstatuses size = " + bytes.length +
    +          ", actual size = " + bcast.value.length)
    +        // Important - ignore the DIRECT tag ! Start from offset 1
    +        deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
    --- End diff --
    
    Sorry,  My English is poor. Judging from the code of  `TorrentBroadcast.readBlocks` and `Dispatcher.MessageLoop`, it is possible that most of the GetLocations messages are processed in the front of the UpdateBlockInfo messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62361365
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,86 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  // Make sure that that we aren't going to exceed the max RPC message size by making sure
    +  // we use broadcast to send large map output statuses.
    +  if (minSizeForBroadcast > maxRpcMessageSize) {
    +    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
    +      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " +
    +      "message that is to large."
    +    logError(msg)
    +    throw new IllegalArgumentException(msg)
    --- End diff --
    
    I think this is all personal opinion and in some situations I agree, but in this case a human set what I would consider an advanced config to a bad value and we should tell them upfront rather then have them expect one thing and get another.  If they are changing advanced configs they are probably tuning their job and thus shouldn't be frustrated over us telling them they are doing something wrong.  It doesn't come like this out of the box when a user is running a simply wordcount or sql statement.  
    
    If this is all that it keeping this from being committed I will change it though as to me its not worth discussing anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62358876
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,86 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  // Make sure that that we aren't going to exceed the max RPC message size by making sure
    +  // we use broadcast to send large map output statuses.
    +  if (minSizeForBroadcast > maxRpcMessageSize) {
    +    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
    +      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " +
    +      "message that is to large."
    +    logError(msg)
    +    throw new IllegalArgumentException(msg)
    --- End diff --
    
    If we introduce an constraint that could be easily resolved by computer, I think it's better to just fix it by computer then tell human. Any exception would make user frustrated, not all the user have the patience to read the error message carefully.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217283469
  
    **[Test build #57912 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57912/consoleFull)** for PR 12113 at commit [`0382155`](https://github.com/apache/spark/commit/0382155ef73437e490abdf83af2b232adbab0eb6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-207081438
  
    **[Test build #55244 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55244/consoleFull)** for PR 12113 at commit [`4d46b96`](https://github.com/apache/spark/commit/4d46b9607e0ea666abd01b32acd9a244eacd1570).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58643596
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends Logging {
         } {
           objOut.close()
         }
    -    out.toByteArray
    +    val arr = out.toByteArray
    +    if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) {
    +      // Use broadcast instead.
    +      // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
    +      val bcast = broadcastManager.newBroadcast(arr, isLocal)
    +      // toByteArray creates copy, so we can reuse out
    +      out.reset()
    +      out.write(BROADCAST)
    +      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
    +      oos.writeObject(bcast)
    +      oos.close()
    +      val outArr = out.toByteArray
    +      logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
    +      (outArr, bcast)
    +    } else {
    +      (arr, null)
    +    }
       }
     
       // Opposite of serializeMapStatuses.
       def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
    -    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
    -    Utils.tryWithSafeFinally {
    -      objIn.readObject().asInstanceOf[Array[MapStatus]]
    -    } {
    -      objIn.close()
    +    assert (bytes.length > 0)
    +
    +    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
    +      val objIn = new ObjectInputStream(new GZIPInputStream(
    +        new ByteArrayInputStream(arr, off, len)))
    +      Utils.tryWithSafeFinally {
    +        objIn.readObject()
    +      } {
    +        objIn.close()
    +      }
    +    }
    +
    +    bytes(0) match {
    +      case DIRECT =>
    +        deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
    +      case BROADCAST =>
    +        // deserialize the Broadcast, pull .value array out of it, and then deserialize that
    +        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
    +          asInstanceOf[Broadcast[Array[Byte]]]
    +        logInfo("Broadcast mapstatuses size = " + bytes.length +
    +          ", actual size = " + bcast.value.length)
    +        // Important - ignore the DIRECT tag ! Start from offset 1
    +        deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
    --- End diff --
    
    What happens if this happens at the same time on different executors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-206618616
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204561553
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54712/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58502169
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    --- End diff --
    
    Yes, I agree with you,  But I actually met mapstatus serialized array size has 190MB, which requires 7 seconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217176362
  
    Note, I'll upmerge and make the changes discussed, should have rework up later today


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217514209
  
    The broadcast workaround looks ok to me, but does the thread pool really help in the non-broadcast case?
    
    The message loop eventually just calls `context.reply(mapOutputStatuses)`; I don't see anywhere where it's actually blocking to avoid filling up the message queues in the RPC layer. So if you're under the broadcast limit, and you have lots and lots of executors, you still could run out of memory, right?
    
    On a side not, I remember from a long time ago that there was talk about embedding this information in the tasks sent to executors (i.e. the task would contain the location of the shuffle blocks needed to finish that task), instead of having every executor fetch the whole output map. Did you consider that option when looking at this? Without too much thinking it seems like it would be a better (and simpler?) long-term solution, but maybe I'm missing something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-204521028
  
    **[Test build #54712 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54712/consoleFull)** for PR 12113 at commit [`3c1def0`](https://github.com/apache/spark/commit/3c1def02b80ebfed55904e504609aa02de6559ce).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58324785
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    +
    +      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    +      // out a snapshot of the locations as "statuses"; let's serialize and return that
    +      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    +        isLocal, minSizeForBroadcast)
    +      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    +      // Add them into the table only if the epoch hasn't changed while we were working
    +      epochLock.synchronized {
    +        if (epoch == epochGotten) {
    +          cachedSerializedStatuses(shuffleId) = bytes
    +          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    +        } else {
    +          logInfo("Epoch changed, not caching!")
    +          removeBroadcast(bcast)
    +        }
           }
    +      bytes
         }
    -    bytes
    +    newbytes
       }
     
       override def stop() {
    +    mapOutputRequests.offer(PoisonPill)
    +    threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
         mapStatuses.clear()
         trackerEndpoint = null
         cachedSerializedStatuses.clear()
    +    clearCachedBroadcast()
    --- End diff --
    
    `BroadcastManager` has stopped, right? There may be output the following log:
    
    `SparkListenerBus has already stopped! Dropping event .....`
    
    LiveListenerBus:
    
    ```scala
    
      def post(event: SparkListenerEvent): Unit = {
        if (stopped.get) {
          // Drop further events to make `listenerThread` exit ASAP
          logError(s"$name has already stopped! Dropping event $event")
          return
        }
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
          eventLock.release()
        } else {
          onDropEvent(event)
        }
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62362600
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,86 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  // Make sure that that we aren't going to exceed the max RPC message size by making sure
    +  // we use broadcast to send large map output statuses.
    +  if (minSizeForBroadcast > maxRpcMessageSize) {
    +    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
    +      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " +
    +      "message that is to large."
    +    logError(msg)
    +    throw new IllegalArgumentException(msg)
    --- End diff --
    
    I'm OK with it (they are not consistent in many places). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62111985
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    We already have a few synchronizations, having another synchronization here could avoid this trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62115610
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    +            val serializedSize = mapOutputStatuses.length
    +            if (serializedSize > maxRpcMessageSize) {
    +              val msg = s"Map output statuses were $serializedSize bytes which " +
    +                s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
    +
    +              // For SPARK-1244 we'll opt for just logging an error and then sending it to
    +              // the sender. A bigger refactoring (SPARK-1239) will ultimately remove this
    --- End diff --
    
    We could make sure that the broadcast threshold is always lower than maxSize, and remove this branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62363343
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -270,12 +259,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
     /**
      * MapOutputTracker for the driver.
      */
    -private[spark] class MapOutputTrackerMaster(conf: SparkConf)
    +private[spark] class MapOutputTrackerMaster(conf: SparkConf,
    +    broadcastManager: BroadcastManager, isLocal: Boolean)
       extends MapOutputTracker(conf) {
     
       /** Cache a serialized version of the output statuses for each shuffle to send them out faster */
       private var cacheEpoch = epoch
     
    +  // The size at which we use Broadcast to send the map output statuses to the executors
    +  private val minSizeForBroadcast = conf.getInt("spark.shuffle.mapOutput.minSizeForBroadcast",
    --- End diff --
    
    Use `conf.getSizeAsFoo` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217533280
  
    **[Test build #58018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58018/consoleFull)** for PR 12113 at commit [`2a52056`](https://github.com/apache/spark/commit/2a520569dbcd5e0ff7a974f2e981bf99ee2c29b7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-206356064
  
    cc @JoshRosen  you were on the jira at one point thinking of working on this.    Any comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58637905
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,93 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
         var epochGotten: Long = -1
         epochLock.synchronized {
           if (epoch > cacheEpoch) {
             cachedSerializedStatuses.clear()
    +        clearCachedBroadcast()
             cacheEpoch = epoch
           }
           cachedSerializedStatuses.get(shuffleId) match {
             case Some(bytes) =>
               return bytes
             case None =>
    +          logDebug("cached status not found for : " + shuffleId)
               statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
               epochGotten = epoch
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    +      shuffleIdLock = if (null != prevLock) prevLock else newLock
    +    }
    +    val newbytes = shuffleIdLock.synchronized {
    +
    +      // double check to make sure someone else didn't serialize and cache the same
    +      // mapstatus while we were waiting on the synchronize
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            return bytes
    +          case None =>
    +            logDebug("shuffle lock cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +        }
    +      }
    +
    +      // If we got here, we failed to find the serialized locations in the cache, so we pulled
    +      // out a snapshot of the locations as "statuses"; let's serialize and return that
    +      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
    +        isLocal, minSizeForBroadcast)
    +      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    +      // Add them into the table only if the epoch hasn't changed while we were working
    +      epochLock.synchronized {
    +        if (epoch == epochGotten) {
    +          cachedSerializedStatuses(shuffleId) = bytes
    +          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
    +        } else {
    +          logInfo("Epoch changed, not caching!")
    +          removeBroadcast(bcast)
    +        }
           }
    +      bytes
         }
    -    bytes
    +    newbytes
       }
     
       override def stop() {
    +    mapOutputRequests.offer(PoisonPill)
    +    threadpool.shutdown()
         sendTracker(StopMapOutputTracker)
         mapStatuses.clear()
         trackerEndpoint = null
         cachedSerializedStatuses.clear()
    +    clearCachedBroadcast()
    --- End diff --
    
    OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58953996
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    --- End diff --
    
    We could move this to else of epoch check ... but probably not required how little the epoch changes in comparison to fetches


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-206618282
  
    **[Test build #55152 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55152/consoleFull)** for PR 12113 at commit [`fcef95b`](https://github.com/apache/spark/commit/fcef95be071dac058511616d9db2ecedea75348a).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58378772
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -296,10 +290,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
       private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
     
    +  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
    +
    +  // Kept in sync with cachedSerializedStatuses explicitly
    +  // This is required so that the Broadcast variable remains in scope until we remove
    +  // the shuffleId explicitly or implicitly.
    +  private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]()
    +
    +  // This is to prevent multiple serializations of the same shuffle - which happens when
    +  // there is a request storm when shuffle start.
    +  private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]()
    +
    +  // requests for map output statuses
    +  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
    +
    +  // Thread pool used for handling map output status requests. This is a separate thread pool
    +  // to ensure we don't block the normal dispatcher threads.
    +  private val threadpool: ThreadPoolExecutor = {
    +    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    +    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    +    for (i <- 0 until numThreads) {
    +      pool.execute(new MessageLoop)
    +    }
    +    pool
    +  }
    +
    +  def post(message: GetMapOutputMessage): Unit = {
    +    mapOutputRequests.offer(message)
    +  }
    +
    +  /** Message loop used for dispatching messages. */
    +  private class MessageLoop extends Runnable {
    +    override def run(): Unit = {
    +      try {
    +        while (true) {
    +          try {
    +            val data = mapOutputRequests.take()
    +             if (data == PoisonPill) {
    +              // Put PoisonPill back so that other MessageLoops can see it.
    +              mapOutputRequests.offer(PoisonPill)
    +              return
    +            }
    +            val context = data.context
    +            val shuffleId = data.shuffleId
    +            val hostPort = context.senderAddress.hostPort
    +            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
    +              " to " + hostPort)
    +            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
    --- End diff --
    
    can you clarify your comment/concern here? 
    
    Yes the getSerializedMapOutputStatuses can block but it only blocks for roughly the time it takes to serialize the map output status.  Obviously could take a little longer if its backed up in the queue but that should be very minimal since at that point its cached.  This is really no different then the behavior before this change, either way you have to  serialize the map output statuses


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217600007
  
    Merging this into master and 2.0 branch, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62361823
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    Thanks for the explain, that make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r62260776
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -428,40 +503,89 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
         }
       }
     
    +  private def removeBroadcast(bcast: Broadcast[_]): Unit = {
    +    if (null != bcast) {
    +      broadcastManager.unbroadcast(bcast.id,
    +        removeFromDriver = true, blocking = false)
    +    }
    +  }
    +
    +  private def clearCachedBroadcast(): Unit = {
    +    for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2)
    +    cachedSerializedBroadcast.clear()
    +  }
    +
       def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
         var statuses: Array[MapStatus] = null
    +    var retBytes: Array[Byte] = null
         var epochGotten: Long = -1
    -    epochLock.synchronized {
    -      if (epoch > cacheEpoch) {
    -        cachedSerializedStatuses.clear()
    -        cacheEpoch = epoch
    -      }
    -      cachedSerializedStatuses.get(shuffleId) match {
    -        case Some(bytes) =>
    -          return bytes
    -        case None =>
    -          statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    -          epochGotten = epoch
    +
    +    // Check to see if we have a cached version, returns true if it does
    +    // and has side effect of setting retBytes.  If not returns false
    +    // with side effect of setting statuses
    +    def checkCachedStatuses(): Boolean = {
    +      epochLock.synchronized {
    +        if (epoch > cacheEpoch) {
    +          cachedSerializedStatuses.clear()
    +          clearCachedBroadcast()
    +          cacheEpoch = epoch
    +        }
    +        cachedSerializedStatuses.get(shuffleId) match {
    +          case Some(bytes) =>
    +            retBytes = bytes
    +            true
    +          case None =>
    +            logDebug("cached status not found for : " + shuffleId)
    +            statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
    +            epochGotten = epoch
    +            false
    +        }
           }
         }
    -    // If we got here, we failed to find the serialized locations in the cache, so we pulled
    -    // out a snapshot of the locations as "statuses"; let's serialize and return that
    -    val bytes = MapOutputTracker.serializeMapStatuses(statuses)
    -    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
    -    // Add them into the table only if the epoch hasn't changed while we were working
    -    epochLock.synchronized {
    -      if (epoch == epochGotten) {
    -        cachedSerializedStatuses(shuffleId) = bytes
    +
    +    if (checkCachedStatuses()) return retBytes
    +    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    +    if (null == shuffleIdLock) {
    +      val newLock = new Object()
    +      // in general, this condition should be false - but good to be paranoid
    +      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
    --- End diff --
    
    This makes me confusing, why this branch is needed if we already add a lock in registerShuffle()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12113#issuecomment-217533559
  
    **[Test build #58018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58018/consoleFull)** for PR 12113 at commit [`2a52056`](https://github.com/apache/spark/commit/2a520569dbcd5e0ff7a974f2e981bf99ee2c29b7).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1239] Improve fetching of map output st...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org