You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/30 12:31:44 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #30480: [SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-based shuffle

Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r532520106



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -289,7 +397,12 @@ private[spark] class MapOutputTrackerMasterEndpoint(
     case GetMapOutputStatuses(shuffleId: Int) =>
       val hostPort = context.senderAddress.hostPort
       logInfo(s"Asked to send map output locations for shuffle ${shuffleId} to ${hostPort}")

Review comment:
       Could you help clean up the redundant `{}` here?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.Utils
+
+/**
+ * The status for the result of merging shuffle partition blocks per individual shuffle partition
+ * maintained by the scheduler. The scheduler would separate the
+ * [[org.apache.spark.network.shuffle.protocol.MergeStatuses]] received from
+ * ExternalShuffleService into individual [[MergeStatus]] which is maintained inside
+ * MapOutputTracker to be served to the reducers when they start fetching shuffle partition
+ * blocks. Note that, the reducers are ultimately fetching individual chunks inside a merged
+ * shuffle file, as explained in [[org.apache.spark.network.shuffle.RemoteBlockPushResolver]].
+ * Between the scheduler maintained MergeStatus and the shuffle service maintained per shuffle
+ * partition meta file, we are effectively dividing the metadata for a push-based shuffle into
+ * 2 layers. The scheduler would track the top-level metadata at the shuffle partition level
+ * with MergeStatus, and the shuffle service would maintain the partition level metadata about
+ * how to further divide a merged shuffle partition into multiple chunks with the per-partition
+ * meta file. This helps to reduce the amount of data the scheduler needs to maintain for
+ * push-based shuffle.
+ */
+private[spark] class MergeStatus(
+    private[this] var loc: BlockManagerId,
+    private[this] var mapTracker: RoaringBitmap,
+    private[this] var size: Long)
+  extends Externalizable with OutputStatus {
+
+  protected def this() = this(null, null, -1) // For deserialization only
+
+  def location: BlockManagerId = loc
+
+  def totalSize: Long = size
+
+  def tracker: RoaringBitmap = mapTracker
+
+  /**
+   * Get the list of mapper IDs for missing mapper partition blocks that are not merged.
+   * The reducer will use this information to decide which shuffle partition blocks to
+   * fetch in the original way.
+   */
+  def getMissingMaps(numMaps: Int): Seq[Int] = {

Review comment:
       This should be a static result. Shall we declare it as a `val` variable?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -987,18 +1276,51 @@ private[spark] object MapOutputTracker extends Logging {
       shuffleId: Int,
       startPartition: Int,
       endPartition: Int,
-      statuses: Array[MapStatus],
+      mapStatuses: Array[MapStatus],
       startMapIndex : Int,
-      endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
-    assert (statuses != null)
+      endMapIndex: Int,
+      mergeStatuses: Option[Array[MergeStatus]] = None):
+      Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    assert (mapStatuses != null)
     val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
-    val iter = statuses.iterator.zipWithIndex
-    for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
-      if (status == null) {
-        val errorMessage = s"Missing an output location for shuffle $shuffleId"
-        logError(errorMessage)
-        throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
-      } else {
+    // Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle
+    // partition consists of blocks merged in random order, we are unable to serve map index
+    // subrange requests. However, when a reduce task needs to fetch blocks from a subrange of
+    // map outputs, it usually indicates skewed partitions which push-based shuffle delegates
+    // to AQE to handle.
+    if (mergeStatuses.isDefined && startMapIndex == 0 && endMapIndex == mapStatuses.length) {
+      // We have MergeStatus and full range of mapIds are requested so return a merged block.
+      val numMaps = mapStatuses.length
+      mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
+        case (mergeStatus, partId) =>
+          val remainingMapStatuses = if (mergeStatus != null) {
+            // If MergeStatus is available for the given partition, add location of the
+            // pre-merged shuffle partition for this partition ID. Here we create a
+            // ShuffleBlockId with mapId being -1 to indicate this is a merged shuffle block.
+            splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=

Review comment:
       Shall we ensure `mergeStatus.totalSize > 0` before updating here?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -28,12 +28,18 @@ import org.apache.spark.internal.config
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
+/**
+ * A common trait between [[MapStatus]] and [[MergeStatus]]. This allows us to reuse existing
+ * code to handle MergeStatus inside MapOutputTracker.
+ */
+private[spark] trait OutputStatus

Review comment:
       How about `ShuffleOutputStatus`?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -524,10 +669,37 @@ private[spark] class MapOutputTrackerMaster(
     }
   }
 
+  def registerMergeResult(shuffleId: Int, reduceId: Int, status: MergeStatus) {
+    shuffleStatuses(shuffleId).addMergeResult(reduceId, status)
+  }
+
+  def registerMergeResults(shuffleId: Int, statuses: Seq[(Int, MergeStatus)]): Unit = {
+    statuses.foreach {
+      case (reduceId, status) => registerMergeResult(shuffleId, reduceId, status)
+    }
+  }
+
+  def unregisterMergeResult(shuffleId: Int, reduceId: Int, bmAddress: BlockManagerId) {

Review comment:
       This's only called in tests yet. Do you plan to use it in the following PR? Although, I think we can also complete it in this PR.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -554,7 +726,11 @@ private[spark] class MapOutputTrackerMaster(
   def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId)
 
   def getNumAvailableOutputs(shuffleId: Int): Int = {
-    shuffleStatuses.get(shuffleId).map(_.numAvailableOutputs).getOrElse(0)
+    shuffleStatuses.get(shuffleId).map(_.numAvailableMapOutputs).getOrElse(0)
+  }
+
+  def getNumAvailableMergeResults(shuffleId: Int): Int = {

Review comment:
       Is this necessary? (except the tests)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org