You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/06 12:20:03 UTC

[GitHub] [spark] attilapiros commented on a change in pull request #31876: [SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage

attilapiros commented on a change in pull request #31876:
URL: https://github.com/apache/spark/pull/31876#discussion_r607790506



##########
File path: core/src/main/java/org/apache/spark/shuffle/api/Location.java
##########
@@ -0,0 +1,28 @@
+package org.apache.spark.shuffle.api;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * :: Private ::
+ * An interface for plugging in the location of shuffle files, in order to support store shuffle
+ * data in different storage, e.g., BlockManager, HDFS, S3. It would be generated by
+ * {@link ShuffleMapOutputWriter} after writing a shuffle data file and used by ShuffleMapOutputReader
+ * to read the shuffle data.
+ *
+ * Since the location is returned by {@link ShuffleMapOutputWriter#commitAllPartitions()} at executor
+ * and would be sent to driver, users must ensure the location is serializable by
+ *
+ *  - implement a 0-arg constructor
+ *  - implement {@link java.io.Externalizable#readExternal(ObjectInput)} for deserialization
+ *  - implement {@link java.io.Externalizable#writeExternal(ObjectOutput)} for serialization
+ *

Review comment:
       Please mention the `equals` method!

##########
File path: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
##########
@@ -189,7 +189,8 @@ private[spark] object JsonProtocol {
   }
 
   def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
-    val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
+    val blockManagerId = blockManagerIdToJson(

Review comment:
       please revert this unnecessary change

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -146,9 +147,9 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
    * This is a no-op if there is no registered map output or if the registered output is from a
    * different block manager.
    */
-  def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock {
-    logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
-    if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) {
+  def removeMapOutput(mapIndex: Int, loc: Location): Unit = withWriteLock {
+    logDebug(s"Removing existing map output $mapIndex $loc")
+    if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == loc) {

Review comment:
       Because of `mapStatuses(mapIndex).location == loc` the `equals` method must be implemented too for the classes implementing the `Location` interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org