You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/18 18:49:32 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished

mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r973756877


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java:
##########
@@ -255,4 +255,17 @@ public void getMergedBlockMeta(
       MergedBlocksMetaListener listener) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Remove the shuffle merge data in shuffle services
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param shuffleId shuffle id.
+   *
+   * @since 3.4.0
+   */
+  public boolean removeShuffleMerge(String host, int port, int shuffleId) {

Review Comment:
   Pass the `shuffleMergeId` as well here (and everywhere else below as relevant).
   This will make sure the protocol/api extensible for future use if/when we want to cleanup for a specific merge id.
   
   Use a specific value to indicate cleanup for all/any shuffle merge id's (for example `-1`, since we start with `0` for shuffle merge id and bump up for new stage attempts for indeterminate stages).
   
   For now, we can fail request if `shuffleMergeId` is not `-1` (in `RemoteBlockPushResolver.java`) - and progressively add support for cleanup of specific merge id in future versions as we need : but this will ensure protocol changes are not required at that point in time.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;

Review Comment:
   Add `appAttemptId` as well here.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,20 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId) {

Review Comment:
   When adding support for `shuffleMergeId`, follow the same pattern as `finalizeShuffleMerge` - there are a few corner cases here, and that method handles them.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;
+  public final int shuffleId;
+
+  public RemoveShuffleMerge(String appId, int shuffleId) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.REMOVE_SHUFFLE_MERGE;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, shuffleId);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("appId", appId)
+      .append("shuffleId", shuffleId)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof RemoveShuffleMerge) {
+      RemoveShuffleMerge o = (RemoveShuffleMerge) other;
+      return Objects.equal(appId, o.appId)
+        && shuffleId == o.shuffleId;

Review Comment:
   nit: Check primitives first before objects



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -364,8 +364,16 @@ class BlockManagerMasterEndpoint(
         }
       }.getOrElse(Seq.empty)
 
+    val removeShuffleMergeFromShuffleServicesFutures =

Review Comment:
   Do this only when push based shuffle is enabled to avoid the `getShufflePushMergerLocations` call.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1395,16 +1393,24 @@ private[spark] class DAGScheduler(
   }
 
   private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): Seq[BlockManagerId] = {
-    val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-    if (mergerLocs.nonEmpty) {
-      stage.shuffleDep.setMergerLocs(mergerLocs)
+    stage.shuffleDep.synchronized {
+      val oldMergeLocs = stage.shuffleDep.getMergerLocs
+      if (oldMergeLocs.isEmpty) {
+        val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+          stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+        if (mergerLocs.nonEmpty) {
+          stage.shuffleDep.setMergerLocs(mergerLocs)
+          mapOutputTracker.registerShufflePushMergerLocations(
+            stage.shuffleDep.shuffleId, mergerLocs)
+        }
+        logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" +
+          s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
+          s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+        mergerLocs
+      } else {
+        oldMergeLocs
+      }
     }
-
-    logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" +
-      s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
-      s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-    mergerLocs
   }

Review Comment:
   We dont need to synchronize here - it is within context of `DAGScheduler` lock
   Given this, the change would be simpler:
   
   ```
   if (stage.shuffleDep.getMergerLocs.nonEmpty) return
   val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( ... )
   if (mergerLocs.nonEmpty) {
     stage.shuffleDep.setMergerLocs(mergerLocs)
     mapOutputTracker.registerShufflePushMergerLocations( ... )
     logDebug( ...)
   }
   ```
   
   
   We can also rename the method as `configureShufflePushMergerLocations` - since we no longer need to return the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org