You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "wangshengjie123 (via GitHub)" <gi...@apache.org> on 2024/03/11 02:46:39 UTC

[PR] [CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

wangshengjie123 opened a new pull request, #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions
   
   
   ### Why are the changes needed?
   Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Cluster test and uts
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1523263904


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:

Review Comment:
   Thanks @mridulm for the explanation, I actually didn't get the idea and was thinking the naive way :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1523250802


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -136,30 +137,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Set<PushFailedBatch> pushFailedBatchSet;

Review Comment:
   get, update later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037365732

   Also, I think this issue would not be only limited to ResultStage, this can happen with ShuffleMapStage as well in some complex cases. Consider another scenario –
   
   `ShuffleMapStage1 -----> ShuffleMapStage2 -----> `
   
   - Similar to above example, let's say partition skew P0 generated by `ShuffleMapStage1`.
   - ShuffleMapStage2 gets FetchFailure while reading sub-partitions of ShuffleMapStage1.
   - ShuffleMapStage1 will be recomputed and shuffle outputs will be cleared.
   - Only missing task of ShuffleMapStage2 will be retries, again causing the same issue.
   
   This is case though, we can rollback the whole lineage till this point instead of failing this job. Similar to what vanilla spark does, what this will be very expensive.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775426


##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+  private int epoch;
+  private int reduceId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public int getReduceId() {
+    return reduceId;
+  }
+
+  public void setReduceId(int reduceId) {
+    this.reduceId = reduceId;
+  }
+
+  public int getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId, int reduceId, int epoch) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+    this.reduceId = reduceId;
+    this.epoch = epoch;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PushFailedBatch)) {
+      return false;
+    }
+    PushFailedBatch o = (PushFailedBatch) other;
+    return super.equals(o)

Review Comment:
   Fixed, thanks a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773599


##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -291,7 +307,12 @@ class ReducePartitionCommitHandler(
               val returnedMsg = GetReducerFileGroupResponse(
                 StatusCode.SUCCESS,
                 reducerFileGroupsMap.getOrDefault(shuffleId, JavaUtils.newConcurrentHashMap()),
-                getMapperAttempts(shuffleId))
+                getMapperAttempts(shuffleId),
+                pushFailedBatches =
+                  shufflePushFailedBatches.getOrDefault(
+                    shuffleId,
+                    JavaUtils.newConcurrentHashMap()).values().asScala.flatMap(x =>
+                    x.asScala.toSet[PushFailedBatch]).toSet.asJava)

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037329644

   @pan3793 This does not become problem if we are maintaining the concept of mapIndex ranges as spark will always read deterministic output for each sub-partition. 
   
   As vanilla spark always read deterministic output because of mapIndex range filter, it will not face this issue. In this approach sub-partitions data will be indeterministic across stage attempts. Failing would be only option for such cases until spark start supporting ResultStage rollback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037721273

   > @wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.
   
   The sort based approach is roughly like follows:
   1. Each sub reducer reads from all partition splits for data within its map range
   2. The first read request will trigger the partition split file to be sorted based on map ids, so each IO will be sequential
   
   ![image](https://github.com/apache/celeborn/assets/948245/09949028-d6f8-4966-be3c-51a027f69524)
   ![image](https://github.com/apache/celeborn/assets/948245/c8ff1791-91cb-4272-8eea-3f3db9895504)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2051387119

   > Based on my current read, this does have correctness implications. I would suggest we should do either or all of the following:
   > 
   > a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
   > 
   > b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
   > 
   > Thoughts ?
   > 
   > Also, how does this feature interact with `celeborn.client.shuffle.rangeReadFilter.enabled` ?
   
   Current if this pr is enabled, shuffle client won`t really apply rangeReadFilter, but we can avoid enable rangeReadFilter. Maybe we could close rangeReadFilter and set shuffle stage INDETEMINATE at shuffle level


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2076353227

   > Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : (
   
   In general, this is going to be very close to 100% - typically it will be less when reducer stage is also a result stage + it is computing only a subset of partitions (like `take`, etc).
   
   > BTW, even though we figure out a way, sorting some files increases the memory/disk burden on worker (maybe negligible if worker is under low load)
   
   Completely agree ! The overall cost wont change - but the impact can be spread out over a larger duration, and executed at the workers at a lower priority: so that when request comes in, it is already materialized.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1529991243


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1393,7 +1414,13 @@ public void onSuccess(ByteBuffer response) {
                     Arrays.toString(partitionIds),
                     groupedBatchId,
                     Arrays.toString(batchIds));
-
+                if (dataPushFailureTrackingEnabled) {

Review Comment:
   There is no need for HARD_SPLIT to do this. as worker never write the batch when HARD_SPLIT. cc @waitinfuture 



##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -244,6 +264,18 @@ class ReducePartitionCommitHandler(
       val attempts = shuffleMapperAttempts.get(shuffleId)
       if (attempts(mapId) < 0) {
         attempts(mapId) = attemptId
+        if (null != pushFailedBatches && !pushFailedBatches.isEmpty) {
+          val pushFailedBatchesMap = shufflePushFailedBatches.computeIfAbsent(
+            shuffleId,
+            newShuffleId2PushFailedBatchMapFunc)
+          pushFailedBatches.forEach((k, v) => {
+            val partitionPushFailedBatches = pushFailedBatchesMap.computeIfAbsent(
+              k,
+              uniqueId2PushFailedBatchMapFunc)
+            partitionPushFailedBatches.addAll(v)
+          })
+          pushFailedBatchesMap.get(pushFailedBatches)

Review Comment:
   Seems `pushFailedBatchesMap.get(pushFailedBatches)` is useless.



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4671,4 +4671,13 @@ object CelebornConf extends Logging {
       .version("0.5.0")
       .intConf
       .createWithDefault(10000)
+
+  val CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED: ConfigEntry[Boolean] =

Review Comment:
   May be we can use another configuration name for enable optimize skew join. The  `CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED` doesn't feel so straightforward.



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -615,6 +663,17 @@ private boolean fillBuffer() throws IOException {
 
           // de-duplicate
           if (attemptId == attempts[mapId]) {
+            if (splitSkewPartitionWithoutMapRange) {

Review Comment:
   1. We can reuse one PushFailedBatch object and update inner fields to improve memory-efficient.
   2. Better to check failedBatches is empty or not first. May be we never need to check failed batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#issuecomment-2001897791

   > > Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?
   > 
   > HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1552832095


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,78 @@
+From 39eeab2426f9676580e4e19c8b079e1967081c7d Mon Sep 17 00:00:00 2001
+From: wangshengjie <wa...@xiaomi.com>
+Date: Sun, 24 Mar 2024 19:51:05 +0800
+Subject: [PATCH] [SQL] Handle skew partitions with Celeborn
+
+---
+ .../org/apache/spark/sql/internal/SQLConf.scala      | 10 ++++++++++
+ .../execution/adaptive/ShufflePartitionsUtil.scala   | 12 +++++++++++-
+ 2 files changed, 21 insertions(+), 1 deletion(-)
+
+diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..1e55af89160 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,13 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED =
++    buildConf("spark.celeborn.client.dataPushFailure.tracking.enabled")
++      .withAlternative("celeborn.client.dataPushFailure.tracking.enabled")
++      .version("3.1.2-mdh")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4556,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def isCelebornClientPushFailedTrackingEnabled: Boolean = getConf(
++    SQLConf.CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..7da6211e509 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading
++      val splitSkewPartitionWithCeleborn = Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.isCelebornClientPushFailedTrackingEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
++        if (splitSkewPartitionWithCeleborn) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, dataSize)

Review Comment:
   We can maybe add a note here that these dataSize will not be accurate. Even though in the current downstream code, we're only getting the sum of dataSize which should be equal but someone might be using these differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2041292224

   ## [Codecov](https://app.codecov.io/gh/apache/celeborn/pull/2373?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `1.20482%` with `82 lines` in your changes are missing coverage. Please review.
   > Project coverage is 48.51%. Comparing base [(`fc23800`)](https://app.codecov.io/gh/apache/celeborn/commit/fc238005bd8482ea41612aae6aae7e8f16f918f5?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`ef81070`)](https://app.codecov.io/gh/apache/celeborn/pull/2373?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 19 commits behind head on main.
   
   > :exclamation: Current head ef81070 differs from pull request most recent head 687d90f. Consider uploading reports for the commit 687d90f to get more accurate results
   
   | [Files](https://app.codecov.io/gh/apache/celeborn/pull/2373?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...born/common/protocol/message/ControlMessages.scala](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fceleborn%2Fcommon%2Fprotocol%2Fmessage%2FControlMessages.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3Byb3RvY29sL21lc3NhZ2UvQ29udHJvbE1lc3NhZ2VzLnNjYWxh) | 0.00% | [38 Missing :warning: ](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../apache/celeborn/common/write/PushFailedBatch.java](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fceleborn%2Fcommon%2Fwrite%2FPushFailedBatch.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vd3JpdGUvUHVzaEZhaWxlZEJhdGNoLmphdmE=) | 0.00% | [24 Missing :warning: ](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...org/apache/celeborn/common/util/PbSerDeUtils.scala](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fceleborn%2Fcommon%2Futil%2FPbSerDeUtils.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3V0aWwvUGJTZXJEZVV0aWxzLnNjYWxh) | 0.00% | [9 Missing :warning: ](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...g/apache/celeborn/common/protocol/StorageInfo.java](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fceleborn%2Fcommon%2Fprotocol%2FStorageInfo.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vcHJvdG9jb2wvU3RvcmFnZUluZm8uamF2YQ==) | 0.00% | [6 Missing :warning: ](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...va/org/apache/celeborn/common/write/PushState.java](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fceleborn%2Fcommon%2Fwrite%2FPushState.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vd3JpdGUvUHVzaFN0YXRlLmphdmE=) | 16.67% | [5 Missing :warning: ](https://app.codecov.io/gh/apache/celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@            Coverage Diff             @@
   ##             main    #2373      +/-   ##
   ==========================================
   - Coverage   48.96%   48.51%   -0.45%     
   ==========================================
     Files         209      210       +1     
     Lines       13146    13186      +40     
     Branches     1135     1139       +4     
   ==========================================
   - Hits         6436     6396      -40     
   - Misses       6287     6368      +81     
   + Partials      423      422       -1     
   ```
   
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/celeborn/pull/2373?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1566224193


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..3b5c7ce4fce 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,12 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ =
++    buildConf("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled")
++      .version("3.3.0")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4555,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
++    getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..38e54b3ed0a 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading
++      val isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
++        if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, -1)

Review Comment:
   Updated patch target to v3.3.4 which includes SPARK-38406, avoiding prefix computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2077605541

   > You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.
   
   No, I meant if skew is detected at driver, the chances that the partition will be fetched by a task in reducer is going to be close to 100% ... and if we trigger this from driver, by the time the executor fetches shuffle data, it would give the worker a longer period of time to finish preparing the data (sort, etc).
   
   I say close to 100% because there can always be cases where `df1.join(df2).take(5)` - and so not all partitions will be computed :-)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1523251711


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -203,6 +252,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
       this.shuffleCompressionEnabled =
           !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
       this.fetchExcludedWorkerExpireTimeout = conf.clientFetchExcludedWorkerExpireTimeout();
+      this.failedBatches = failedBatchSet;
+      this.pushShuffleFailureTrackingEnabled = conf.clientPushFailureTrackingEnabled();

Review Comment:
   Yes, and need fix another logic bug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#issuecomment-1996733963

   > Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?
   
   HI, @wangshengjie123
   Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#issuecomment-2001898634

   > > Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?
   > 
   > HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!
   
   Sorry for late reply,  the pr will be updated today or tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536774241


##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -625,6 +636,7 @@ message PbOpenStream {
   int32 endIndex = 4;
   int32 initialCredit = 5;
   bool readLocalShuffle = 6;
+  bool shuffleDataNeedSort = 7;

Review Comment:
   Fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562071909


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..3b5c7ce4fce 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,12 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ =
++    buildConf("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled")
++      .version("3.3.0")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4555,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
++    getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..38e54b3ed0a 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading
++      val isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
++        if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, -1)

Review Comment:
   When `isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled`, avoid the prefix computation here entirely ?
   That is, avoid
    ```
           var dataSize = 0L
           var mapIndex = startMapIndex
           while (mapIndex < endMapIndex) {
             dataSize += mapPartitionSizes(mapIndex)
             mapIndex += 1
           }
   ``` ?



##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..3b5c7ce4fce 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,12 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ =
++    buildConf("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled")
++      .version("3.3.0")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4555,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
++    getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..38e54b3ed0a 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading
++      val isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
++        if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, -1)

Review Comment:
   When `isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled`, avoid the prefix computation here entirely ?
   That is, avoid
    ```
           var dataSize = 0L
           var mapIndex = startMapIndex
           while (mapIndex < endMapIndex) {
             dataSize += mapPartitionSizes(mapIndex)
             mapIndex += 1
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562130631


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   Yes, maybe we retry push data, but maybe primary peer success but replica peer failed, we want to exclude this batch to avoid reading data repeatly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2051468165

   > a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
   
   Totally agree! As @waitinfuture  mentioned [above](https://github.com/apache/celeborn/pull/2373#issuecomment-2041062688), and after discussing with @pan3793 offline, we can disable this feature and the use of `stage rerun` at the same time. I'll fix this later.
   
   > b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
   
   seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536774083


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -203,6 +252,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
       this.shuffleCompressionEnabled =
           !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
       this.fetchExcludedWorkerExpireTimeout = conf.clientFetchExcludedWorkerExpireTimeout();
+      this.failedBatches = failedBatchSet;
+      this.pushShuffleFailureTrackingEnabled = conf.clientPushFailureTrackingEnabled();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775099


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -136,30 +137,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Set<PushFailedBatch> pushFailedBatchSet;

Review Comment:
   Done, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2036578517

   @wangshengjie123 Is there any doc explaining this approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037156713

   From my understanding, in this PR we're diverting from vanilla spark approach based on mapIndex and just dividing the full partition into multiple sub-partition based on some heuristics. I'm new to Celeborn code, so might be missing something basic but in this PR we're not addressing below issue. If we consider a basic scenario where a partial partition read is happening and we see a FetchFailure.
   
   `ShuffleMapStage --> ResultStage`
                        
   - ShuffleMapStage (attempt 0) generated [P0, P1, P2] and P0 is skewed with partition location [0,1,2,3,4,5].
   - AQE asks for three splits and this PR logic will create three partitions [0, 1], [2, 3], [4, 5]
   - Now consider is reducer read [0, 1] and [2, 3] and gets `FetchFailure` while reading [4, 5]
   - This will trigger a complete mapper stage retry a/c to this [doc](https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8/edit) and will clear the map output corresponding the shuffleID
   - ShuffleMapStage (attempt 0) will again generate data for P0 at different partition location [a, b, c, d, e, f] and it will get divided like [a, b], [c, d], [e, f]
   - Now if reader stage is `ShuffleMapStage` then it will read every sub-partition again but if the reader is `ResultStage` then it will only read missing partition data which [e, f].
   
   The data generated on location `1` and location `a` would be different because of other factors like network delay (same thing applies for other locations). Ex – The data that might be present in 1st location in first attempt might be present in 2nd location or any location in different attempt because of the order mapper generated the data and in order server received that data. 
   
   This can cause both Data loss and Data duplication, this might be getting addressed in some other place in the codebase that i'm not aware of but i wanted point this problem out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037829707

   Thanks a lot @waitinfuture for the sort based approach description.
   
   > Is it possible to force make it as indeterministic?
   
   IMO this would be very difficult to do it from Celeborn itself but it can be done by putting a patch in the Spark code. ShuffledRowRDD can set Determinacy Level to INDETEMINATE if partial partition reads are happening and Celeborn is getting is used.
   
   cc: @mridulm for viz


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2058422163

   > a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
   > b) We should recommend enabling replication to leverage this feature - this minimizes the risk of data loss which would trigger recomputation.
   
   @mridulm @cfmcgrady As enabling replication is expensive and some uses might not want to enable it. Another way to handle could be to only fail the stage if stage has any skew partition read. This way it will only affect the stages where skew and will not affect the stages or apps where there is no skew, this can increase the overall reliability for huge percentage of apps. We can make it configurable to give more control to the user. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2041499262

   To reviewer.
   Just wanted to give you an update on my recent validation work on our internal cluster using split skewed partition with Celeborn Split level approach. I ran a job with the default Celeborn Split size of 1GB and spark advisoryPartitionSize of 64MB. However, I noticed that only 1/16 tasks were fetching the shuffle data to run, while the rest were empty.
   
   After discussing this with @waitinfuture , @wangshengjie123 , and @pan3793 , we decided to leverage chunks to split skewed partitions and gain more fine-grained data size sub-partitions. This was implemented in https://github.com/apache/celeborn/pull/2373/commits/dfeb731da692aeef1c513f5ac3837275146009f5  and I tested it on my internal cluster with online tasks. The performance of the Shuffle Read stage was almost as good as ESS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2097429735

   Thanks for writing it up @s0nskar, very helpful ! I have added some clarifying comments there.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562071909


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..3b5c7ce4fce 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,12 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ =
++    buildConf("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled")
++      .version("3.3.0")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4555,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
++    getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..38e54b3ed0a 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading
++      val isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
++        if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, -1)

Review Comment:
   When `isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled`, avoid the prefix computation here entirely ?
   That is
    ```
           var dataSize = 0L
           var mapIndex = startMapIndex
           while (mapIndex < endMapIndex) {
             dataSize += mapPartitionSizes(mapIndex)
             mapIndex += 1
           }
   ```



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1025,6 +1034,10 @@ public void onSuccess(ByteBuffer response) {
                       attemptId,
                       partitionId,
                       nextBatchId);
+                  if (dataPushFailureTrackingEnabled) {
+                    pushState.addFailedBatch(
+                        latest.getUniqueId(), new PushFailedBatch(mapId, attemptId, nextBatchId));

Review Comment:
   QQ: Do we want to track this here (and other `onFailure` below) ?
   This can simply be a retriable error right ?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   QQ: This is essentially relevant only when hard_split's happen, right ?
   For all other cases, either we retry and it succeeds or it fails ?
   I want to make sure I am not missing some other cases.
   



##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+  }

Review Comment:
   nit: move constructor to above get/set methods (after field definitions).



##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -279,7 +281,9 @@ object ControlMessages extends Logging {
       status: StatusCode,
       fileGroup: util.Map[Integer, util.Set[PartitionLocation]],
       attempts: Array[Int],
-      partitionIds: util.Set[Integer] = new util.HashSet[Integer]())
+      partitionIds: util.Set[Integer] = new util.HashSet[Integer](),
+      pushFailedBatches: util.Map[String, util.Set[PushFailedBatch]] =
+        new util.HashMap[String, util.Set[PushFailedBatch]]())

Review Comment:
   ```suggestion
           Collections.emptyMap())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773133


##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -244,6 +264,18 @@ class ReducePartitionCommitHandler(
       val attempts = shuffleMapperAttempts.get(shuffleId)
       if (attempts(mapId) < 0) {
         attempts(mapId) = attemptId
+        if (null != pushFailedBatches && !pushFailedBatches.isEmpty) {
+          val pushFailedBatchesMap = shufflePushFailedBatches.computeIfAbsent(
+            shuffleId,
+            newShuffleId2PushFailedBatchMapFunc)
+          pushFailedBatches.forEach((k, v) => {
+            val partitionPushFailedBatches = pushFailedBatchesMap.computeIfAbsent(
+              k,
+              uniqueId2PushFailedBatchMapFunc)
+            partitionPushFailedBatches.addAll(v)
+          })
+          pushFailedBatchesMap.get(pushFailedBatches)

Review Comment:
   Yes, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773046


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -615,6 +663,17 @@ private boolean fillBuffer() throws IOException {
 
           // de-duplicate
           if (attemptId == attempts[mapId]) {
+            if (splitSkewPartitionWithoutMapRange) {

Review Comment:
   1. get this
   2. fixed to avid NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1522559926


##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+  private int epoch;
+  private int reduceId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public int getReduceId() {
+    return reduceId;
+  }
+
+  public void setReduceId(int reduceId) {
+    this.reduceId = reduceId;
+  }
+
+  public int getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId, int reduceId, int epoch) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+    this.reduceId = reduceId;
+    this.epoch = epoch;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PushFailedBatch)) {
+      return false;
+    }
+    PushFailedBatch o = (PushFailedBatch) other;
+    return super.equals(o)

Review Comment:
   `super.equals` will always return `false` - since that will be based on object equality.
   (And if `this == other`, we really dont need the remaining checks)



##########
client/src/main/scala/org/apache/celeborn/client/CommitManager.scala:
##########
@@ -207,13 +209,15 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
       mapId: Int,
       attemptId: Int,
       numMappers: Int,
-      partitionId: Int = -1): (Boolean, Boolean) = {
+      partitionId: Int = -1,
+      pushFailedBatches: util.Set[PushFailedBatch] = Sets.newHashSet()): (Boolean, Boolean) = {

Review Comment:
   Cant this not be an immutable default ?
   
   ```suggestion
         pushFailedBatches: util.Set[PushFailedBatch] = Collections.emptySet()): (Boolean, Boolean) = {
   ```



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);

Review Comment:
   If we are not expecting duplicates (which we are not), simply use array sort with the same comparator ?
   (Also incorporating @waitinfuture's suggestion of unique id)
   
   ```suggestion
       Arrays.sort(locations,
               (o1, o2) -> {
                 int cmp = Long.compare(o1.getStorageInfo().fileSize, o2.getStorageInfo().fileSize);
                 return 0 != cmp ? cmp : o1.getUniqueId().compareTo(o2.getUniqueId());
               }
       );
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -291,7 +307,12 @@ class ReducePartitionCommitHandler(
               val returnedMsg = GetReducerFileGroupResponse(
                 StatusCode.SUCCESS,
                 reducerFileGroupsMap.getOrDefault(shuffleId, JavaUtils.newConcurrentHashMap()),
-                getMapperAttempts(shuffleId))
+                getMapperAttempts(shuffleId),
+                pushFailedBatches =
+                  shufflePushFailedBatches.getOrDefault(
+                    shuffleId,
+                    JavaUtils.newConcurrentHashMap()).values().asScala.flatMap(x =>
+                    x.asScala.toSet[PushFailedBatch]).toSet.asJava)

Review Comment:
   Will this not work ?
   
   ```suggestion
                       JavaUtils.newConcurrentHashMap()).values().asScala.flatMap(_.asScala).toSet.asJava)
   ```



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:

Review Comment:
   If I am not wrong, the idea is to minimize per row size - and so why column 0 goes "down" the array index, while column 1 goes "up" - and keeps alternating - so that as the size keeps increasing, it is more reasonably distributed for each row (essentially a way to approximate multi-way partition problem).
   
   The result would be different for the formulation above @waitinfuture.
   
   For example:
   
   partition sizes: {1000, 1100, 1300, 1400, 2000, 2500, 3000, 10000, 20000, 25000, 28000, 30000} 
   `subPartitionSize` == 3 
   `subPartitionIndex` == 1
   
   In formulation from PR we have: 
   
   task 0: 1000 , 2500 , 3000 , 30000
   task 1: 1100 , 2000 , 10000 , 28000
   task 2:  1300 , 1400 , 20000 , 25000 
   
   So the sizes will be:
   task 0: 36500
   task 1: 41100
   task 2: 47700
   
   
   As formulated above, we will end up with:
   
   task 0: 1000 , 1400 , 3000 , 25000
   task 1: 1100 , 2000 , 10000 , 28000
   task 2:  1300 , 2500 , 20000 , 30000 
   
   In this case, the sizes will be:
   task 0: 30400
   task 1: 41100
   task 2: 53800
   
   
   Personally, I would have looked into either largest remainder or knapsack heuristic (given we are sorting anyway).
   
   (Do let me know if I am missing something here @wangshengjie123)



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -132,6 +176,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
 
     private Map<Integer, Set<Integer>> batchesRead = new HashMap<>();
 
+    private final Set<PushFailedBatch> failedBatches;

Review Comment:
   Took me a little bit to understand why this would help - nice idea !



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();

Review Comment:
   (and move it below `step` initialization)
   ```suggestion
       List<PartitionLocation> result = new ArrayList<>(step + 1);
   ```



##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+  private int epoch;
+  private int reduceId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public int getReduceId() {
+    return reduceId;
+  }
+
+  public void setReduceId(int reduceId) {
+    this.reduceId = reduceId;
+  }
+
+  public int getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId, int reduceId, int epoch) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+    this.reduceId = reduceId;
+    this.epoch = epoch;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PushFailedBatch)) {
+      return false;
+    }
+    PushFailedBatch o = (PushFailedBatch) other;
+    return super.equals(o)
+        && mapId == o.mapId
+        && attemptId == o.attemptId
+        && batchId == o.batchId
+        && reduceId == o.reduceId
+        && epoch == o.epoch;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(super.hashCode(), mapId, attemptId, batchId, reduceId, epoch);

Review Comment:
   Avoid `super.hashCode` here - else you wont be able to lookup two instances with exact same fields.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -223,8 +227,8 @@ class FetchHandler(
           // 1. when the current request is a non-range openStream, but the original unsorted file
           //    has been deleted by another range's openStream request.
           // 2. when the current request is a range openStream request.
-          if ((endIndex != Int.MaxValue) || (endIndex == Int.MaxValue && !fileInfo.addStream(
-              streamId))) {
+          if (shuffleNeedSort && ((endIndex != Int.MaxValue) ||
+              (endIndex == Int.MaxValue && !fileInfo.addStream(streamId)))) {

Review Comment:
   nit: To improve clarity a bit
   ```suggestion
             if (shuffleNeedSort &&
                   (endIndex != Int.MaxValue || (endIndex == Int.MaxValue && !fileInfo.addStream(streamId)))) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "lyy-pineapple (via GitHub)" <gi...@apache.org>.
lyy-pineapple commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1519595369


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
+    // task 0: 1, 6, 7
+    // task 1: 2, 5, 8
+    // task 2: 3, 4, 9, 10
+    for (int i = 0; i < step + 1; i++) {
+      if (i % 2 == 0 && (i * 3 + subPartitionIndex) < locations.length) {

Review Comment:
   (i % 2 == 0 && (i * 3 + subPartitionIndex) < locations.length



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2076295830

   QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on `ShufflePartitionsUtil`) ?
   
   This will help if we are trying to mitigate the impact of reducer read timeouts, etc.
   It wont bring down the overall load though.
   
   On plus side, it does not suffer from the correctness issues here.
   
   Thoughts ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2076435620

   > In general, this is going to be very close to 100%
   
   You mean close to 100% of shuffle read will be in map-range fashion (which means it's the case of skewed join and hits Spark's optimization), or am I misunderstand something? In my experience the ratio is relatively low.
   
   IMHO, if we can figure out which partitions will be read in map-range fashion ahead of time (which in my understanding is quite difficult, currently depend on Spark's reoptimization before each stage), we can apply your proposal to pre-sort them in current implementation. Additionally, we can still keep working on this PR so users can choose which one to use, because this PR eliminates sorting completely which is quite attractive to me :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2097428792

   >> OR we can treat skewed shuffleIDs as INDETERMINATE stages and let Spark handle the stages retries and abortion accordingly.
   > I think this is the key idea, if this is doable, I would prefer this approach : )
   
   This gets tricky, and depends on when we 'apply' the diff.
   It has to be detected and applied before `DAGScheculer` uses it, when the stage fails with fetch failure.
   Not that it cant be done - but it is invasive, and unfortunately unstable (this can arbitrarily evolve within scheduler)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 closed pull request #2373: [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files
URL: https://github.com/apache/incubator-celeborn/pull/2373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "lyy-pineapple (via GitHub)" <gi...@apache.org>.
lyy-pineapple commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1519595369


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
+    // task 0: 1, 6, 7
+    // task 1: 2, 5, 8
+    // task 2: 3, 4, 9, 10
+    for (int i = 0; i < step + 1; i++) {
+      if (i % 2 == 0 && (i * 3 + subPartitionIndex) < locations.length) {

Review Comment:
   (i % 2 == 0 && (i * subPartitionSize + subPartitionIndex) < locations.length



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2040911125

   > It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)
   
   > the split into subranges is deterministic
   
   The way Celeborn splits partition is not deterministic with stage rerun, for example any push failure will cause split, so I'm afraid this statement does not hold...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2041062688

   > Ah, I see what you mean ... `PartitionLocation` would change between retries. Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207
   > 
   > I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.
   
   Maybe we can remain both this optimization and stage rerun, but only allows one to take effect by checking configs for now. The performance issue this PR solves does happen in production.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775472


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -223,8 +227,8 @@ class FetchHandler(
           // 1. when the current request is a non-range openStream, but the original unsorted file
           //    has been deleted by another range's openStream request.
           // 2. when the current request is a range openStream request.
-          if ((endIndex != Int.MaxValue) || (endIndex == Int.MaxValue && !fileInfo.addStream(
-              streamId))) {
+          if (shuffleNeedSort && ((endIndex != Int.MaxValue) ||
+              (endIndex == Int.MaxValue && !fileInfo.addStream(streamId)))) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536774152


##########
common/src/main/java/org/apache/celeborn/common/write/PushState.java:
##########
@@ -33,9 +35,12 @@ public class PushState {
   public AtomicReference<IOException> exception = new AtomicReference<>();
   private final InFlightRequestTracker inFlightRequestTracker;
 
+  private Set<PushFailedBatch> failedBatchSet;
+
   public PushState(CelebornConf conf) {
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
     inFlightRequestTracker = new InFlightRequestTracker(conf, this);
+    failedBatchSet = new HashSet<>();

Review Comment:
   Get, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536774655


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -132,6 +176,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
 
     private Map<Integer, Set<Integer>> batchesRead = new HashMap<>();
 
+    private final Set<PushFailedBatch> failedBatches;

Review Comment:
   Done,  thanks for your advice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773046


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -615,6 +663,17 @@ private boolean fillBuffer() throws IOException {
 
           // de-duplicate
           if (attemptId == attempts[mapId]) {
+            if (splitSkewPartitionWithoutMapRange) {

Review Comment:
   1. get this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2078878530

   > The data correctness issues, unless we can mitigate it, is what makes it risky for me.
   
   Totally Agree! Correctness is NO.1 important thing. Based on previous discussion, correctness will only happen when stage rerun, I think we both agree that if this PR takes affects, we will not allow stage rerun, right?
   
   > Based on my current read, this does have correctness implications.
   I would suggest we should do either or all of the following:
   >
   > a) If recomputation happens, we should fail the stage and not allow retries - this will prevent data loss.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2080175337

   > Totally Agree! Correctness is NO.1 important thing. Based on previous discussion, correctness will only happen when stage rerun, I think we both agree that if this PR takes affects, we will not allow stage rerun, right?
   
   As [currently formulated](https://github.com/apache/celeborn/pull/2373/files#diff-c8ece928ffe63a93526fd731b5d092d659255064e6880c428ae25d252324069eR68), we eagerly fail based on config: you are right, this does prevent data corruption - but requires disabling `throwsFetchFailure` in order to leverage the benefits in this PR (and disabling `throwsFetchFailure` which has its own implications, as discussed in CELEBORN-955 for Spark).
   
   Which is why I am trying to see if we can keep both benefits :-) `throwsFetchFailure` to allow for resilience in case of shuffle fetch failures, as well as mitigate the performance/timeout issues due to sorting.
   
   We dont necessarily need to explore [this proposal](https://github.com/apache/celeborn/pull/2373#issuecomment-2076295830) in current PR btw ! The changes here are flag guarded anyway, so can be independently explored in future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2096197210

   > I've summarised the above discussion to solve data issues – to the best of my understanding – in this doc and proposed a approach as well. PTAL
   > 
   > https://docs.google.com/document/d/1wOiVPp8Wp-yDgJLm6lrzgwnbrwKZbMJwE5jVXfUdemY/edit?usp=sharing
   
   Hi @s0nskar , thanks for the proposal!
   
   > OR we can treat skewed shuffleIDs as INDETERMINATE stages and let Spark handle the stages retries and abortion accordingly.
   
   I think this is the key idea, if this is doable, I would prefer this approach : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562078042


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   QQ: This is essentially relevant only when hard_split's happen, right ?
   For all other cases, either we retry and it succeeds or task fails ?
   I want to make sure I am not missing some other cases.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1520679253


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();

Review Comment:
   hashCode can conflict, better to use `getUniqueId`?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -136,30 +137,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Set<PushFailedBatch> pushFailedBatchSet;

Review Comment:
   I think it's better to organize this as `PartitionLocation#uniqueId -> Set<PushFailedBatch>` to reduce comparison



##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -31,7 +32,8 @@ import org.apache.celeborn.common.network.protocol.TransportMessage
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.MessageType._
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{PbSerDeUtils, Utils}
+import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, Utils}

Review Comment:
   Unused import for `JavaUtils`



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -132,6 +176,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
 
     private Map<Integer, Set<Integer>> batchesRead = new HashMap<>();
 
+    private final Set<PushFailedBatch> failedBatches;

Review Comment:
   ditto, can be organized as `PartitionLocation#uniqueId -> Set<PushFailedBatch>` to reduce comparison. We can get current PartitionLocation through `currentReader`



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -203,6 +252,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
       this.shuffleCompressionEnabled =
           !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
       this.fetchExcludedWorkerExpireTimeout = conf.clientFetchExcludedWorkerExpireTimeout();
+      this.failedBatches = failedBatchSet;
+      this.pushShuffleFailureTrackingEnabled = conf.clientPushFailureTrackingEnabled();

Review Comment:
   I think we need to check that if `pushShuffleFailureTrackingEnabled` is false but `endMapIndex` is not `Integer.MAX_VALUE`, exception will be thrown.



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:

Review Comment:
   Seems the logic should be like this:
   ```
       // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
       // task 0: 1, 4, 7, 10
       // task 1: 2, 4, 8
       // task 2: 3, 5, 9
       for (int i = 0; i < step + 1; i++) {
         int index = i * step + subPartitionIndex;
         if (index < locations.length) {
           result.add(orderedPartitionLocations[index]);
         }
       }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -56,11 +56,14 @@ import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Ut
 import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.common.util.ThreadUtils.awaitResult
 import org.apache.celeborn.common.util.Utils.UNKNOWN_APP_SHUFFLE_ID
+import org.apache.celeborn.common.write.PushFailedBatch
 
 object LifecycleManager {
   // shuffle id -> partition id -> partition locations
   type ShuffleFileGroups =
     ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PartitionLocation]]]
+  type ShufflePushFailedBatches =
+    ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PushFailedBatch]]]

Review Comment:
   Is it possible to use `shuffleId -> Set<PushFailedBatch>` instead of `shuffleId -> (mapId -> Set<PushFailedBatch>`?



##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -625,6 +636,7 @@ message PbOpenStream {
   int32 endIndex = 4;
   int32 initialCredit = 5;
   bool readLocalShuffle = 6;
+  bool shuffleDataNeedSort = 7;

Review Comment:
   IIUC if this PR's optimization is enabled, `endIndex` will be Int.MAX_VALUE, so `shuffleDataNeedSort` is unnecessary.



##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.protocol.message
 
 import java.util
 import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap

Review Comment:
   Unused import



##########
common/src/main/java/org/apache/celeborn/common/write/PushState.java:
##########
@@ -33,9 +35,12 @@ public class PushState {
   public AtomicReference<IOException> exception = new AtomicReference<>();
   private final InFlightRequestTracker inFlightRequestTracker;
 
+  private Set<PushFailedBatch> failedBatchSet;
+
   public PushState(CelebornConf conf) {
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
     inFlightRequestTracker = new InFlightRequestTracker(conf, this);
+    failedBatchSet = new HashSet<>();

Review Comment:
   Seems should use ConcurrentHashSet here to handle concurrent adding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775365


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();

Review Comment:
   Fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536772966


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1393,7 +1414,13 @@ public void onSuccess(ByteBuffer response) {
                     Arrays.toString(partitionIds),
                     groupedBatchId,
                     Arrays.toString(batchIds));
-
+                if (dataPushFailureTrackingEnabled) {

Review Comment:
   I'm not sure if it's possible that the master copy succeeds but the copy fails due to HARD_SPLIT. I will check it again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773685


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775282


##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -31,7 +32,8 @@ import org.apache.celeborn.common.network.protocol.TransportMessage
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.MessageType._
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{PbSerDeUtils, Utils}
+import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, Utils}

Review Comment:
   Removed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1563131035


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1025,6 +1034,10 @@ public void onSuccess(ByteBuffer response) {
                       attemptId,
                       partitionId,
                       nextBatchId);
+                  if (dataPushFailureTrackingEnabled) {
+                    pushState.addFailedBatch(
+                        latest.getUniqueId(), new PushFailedBatch(mapId, attemptId, nextBatchId));

Review Comment:
   Thanks for clarifying !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562166228


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1025,6 +1034,10 @@ public void onSuccess(ByteBuffer response) {
                       attemptId,
                       partitionId,
                       nextBatchId);
+                  if (dataPushFailureTrackingEnabled) {
+                    pushState.addFailedBatch(
+                        latest.getUniqueId(), new PushFailedBatch(mapId, attemptId, nextBatchId));

Review Comment:
   The initial design intention was that any Revive could potentially lead to duplicate data reads because the current task is unable to perform deduplication at the mapId-reduce level:
   
   Before this pr, one reduce task will read some determine mapId`s data, but current one mapId`s data maybe read by multi-sub-reduce tasks partitioned by PartitionLocation, for example:
   
   - map1 write reduce1 shuffle data with 2 batch: batch1 and batch2,
   - batch1 pushed to PartitionLocation1 suceess,
   - but batch2 failed due to only Primary peer succeed, then batch2 Revive to PartitionLocation2
   - sub-redcue1 will read PartitionLocation1 with batch1 and maybe read batch2
   - sub-reduce2 will read PartitionLocation2 with batch2 again
   
   Therefore, all Revive-related push batch will be record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1564540946


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   I'm not quite sure if I fully understand your meaning, do you mean: 
   - `the data would be under replicated`, this means data will be lost ? I think data will not be lost, because failed batch will be retried to another PartitionLocation requested by Revive with epoch + 1 
   - `If yes, wouldn't it not be simply better to retry to the same (or perhaps updated) peer ?` Revive is designed to avoid retry push data to the same PartitionLocation, maybe sometimes retry the same node will not success or timeout. And we want to ensure that one Succeed batch should be available both Primary and Replicate Peer
   
   If I misunderstood, could you kindly explain further and provide an example?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2052458444

   > seem the replication mechanism is unable to mitigate the risk of data loss in this scenario?
   
   When fetch failures are due to worker unavailability (node crash, etc) - replicas should allow the reducer to continuing fetching data.
   This is more of a statistical reduction in failures, not elimination - agree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037870674

   @waitinfuture It seems this PR is getting attention, some discussions happened offline, we'd better update the PR description to summarize the whole design and known issues so far


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "lyy-pineapple (via GitHub)" <gi...@apache.org>.
lyy-pineapple commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1519595369


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
+    // task 0: 1, 6, 7
+    // task 1: 2, 5, 8
+    // task 2: 3, 4, 9, 10
+    for (int i = 0; i < step + 1; i++) {
+      if (i % 2 == 0 && (i * 3 + subPartitionIndex) < locations.length) {

Review Comment:
   i % 2 == 0 && (i * subPartitionSize + subPartitionIndex)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1523248831


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:

Review Comment:
   @mridulm Sorry for late reply, your understanding is correct, and i should optimize the logic 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2040852271

   It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so).
   With that in place, this would not be an issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2040973788

   +CC @otterc as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2040970108

   Ah, I see what you mean ... `PartitionLocation` would change between retries.
   Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207
   
   I will need to relook at the PR, and how it interact with Celeborn - but if SPARK-23207 is applicable, we should not proceed down this path.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536774558


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -56,11 +56,14 @@ import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Ut
 import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.common.util.ThreadUtils.awaitResult
 import org.apache.celeborn.common.util.Utils.UNKNOWN_APP_SHUFFLE_ID
+import org.apache.celeborn.common.write.PushFailedBatch
 
 object LifecycleManager {
   // shuffle id -> partition id -> partition locations
   type ShuffleFileGroups =
     ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PartitionLocation]]]
+  type ShufflePushFailedBatches =
+    ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PushFailedBatch]]]

Review Comment:
   Update shuffleId to UniqueId,done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1562113488


##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+  }

Review Comment:
   done
   



##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -279,7 +281,9 @@ object ControlMessages extends Logging {
       status: StatusCode,
       fileGroup: util.Map[Integer, util.Set[PartitionLocation]],
       attempts: Array[Int],
-      partitionIds: util.Set[Integer] = new util.HashSet[Integer]())
+      partitionIds: util.Set[Integer] = new util.HashSet[Integer](),
+      pushFailedBatches: util.Map[String, util.Set[PushFailedBatch]] =
+        new util.HashMap[String, util.Set[PushFailedBatch]]())

Review Comment:
   done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1563133385


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   The concern I have with this is, the data would be under replicated, right ?
   If yes, wouldn't it not be simply better to retry with an updated peer ?
   
   If that is an option, the problem you [described above](https://github.com/apache/celeborn/pull/2373#discussion_r1562166228) is designed away as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2076301404

   > QQ: thinking out aloud, instead of this change - do we want to proactively trigger sort for reducers where we are reading a subset of mapper output (based on `ShufflePartitionsUtil`) ?
   > 
   > This will help if we are trying to mitigate the impact of reducer read timeouts, etc. It wont bring down the overall load (at worker) though.
   > 
   > On plus side, it does not suffer from the correctness issues here.
   > 
   > Thoughts ?
   
   Unfortunately we don't know whether a partition split will be read in a map-range fashion until the read request comes : ( 
   BTW, even though we figure out a way, overall sorting some files increases the burden on worker (maybe negligible if worker is under low load) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#issuecomment-2001906903

   ## [Codecov](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `8.24742%` with `89 lines` in your changes are missing coverage. Please review.
   > Project coverage is 48.59%. Comparing base [(`12c3779`)](https://app.codecov.io/gh/apache/incubator-celeborn/commit/12c3779805d7427028e91dc8067ef55ef3f0994f?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`599be24`)](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   | [Files](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...born/common/protocol/message/ControlMessages.scala](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3Byb3RvY29sL21lc3NhZ2UvQ29udHJvbE1lc3NhZ2VzLnNjYWxh) | 0.00% | [38 Missing :warning: ](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../apache/celeborn/common/write/PushFailedBatch.java](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vd3JpdGUvUHVzaEZhaWxlZEJhdGNoLmphdmE=) | 0.00% | [29 Missing :warning: ](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...org/apache/celeborn/common/util/PbSerDeUtils.scala](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3V0aWwvUGJTZXJEZVV0aWxzLnNjYWxh) | 0.00% | [11 Missing :warning: ](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...g/apache/celeborn/common/protocol/StorageInfo.java](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vcHJvdG9jb2wvU3RvcmFnZUluZm8uamF2YQ==) | 0.00% | [6 Missing :warning: ](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...va/org/apache/celeborn/common/write/PushState.java](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vd3JpdGUvUHVzaFN0YXRlLmphdmE=) | 16.67% | [5 Missing :warning: ](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@            Coverage Diff             @@
   ##             main    #2373      +/-   ##
   ==========================================
   - Coverage   48.77%   48.59%   -0.18%     
   ==========================================
     Files         209      210       +1     
     Lines       13109    13200      +91     
     Branches     1134     1139       +5     
   ==========================================
   + Hits         6393     6413      +20     
   - Misses       6294     6366      +72     
   + Partials      422      421       -1     
   ```
   
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/incubator-celeborn/pull/2373?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1520752954


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -87,6 +96,41 @@ public static CelebornInputStream create(
     }
   }
 
+  public static PartitionLocation[] getSkewPartitionLocations(
+      PartitionLocation[] locations, int subPartitionSize, int subPartitionIndex) {
+    Set<PartitionLocation> sortSet =
+        new TreeSet<>(
+            (o1, o2) -> {
+              if (o1.getStorageInfo().fileSize > o2.getStorageInfo().fileSize) {
+                return 1;
+              } else if (o1.getStorageInfo().fileSize < o2.getStorageInfo().fileSize) {
+                return -1;
+              } else {
+                return o1.hashCode() - o2.hashCode();
+              }
+            });
+    sortSet.addAll(Arrays.asList(locations));
+    PartitionLocation[] orderedPartitionLocations = sortSet.toArray(new PartitionLocation[0]);
+
+    List<PartitionLocation> result = new LinkedList<>();
+
+    int step = locations.length / subPartitionSize;
+
+    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
+    // task 0: 1, 6, 7
+    // task 1: 2, 5, 8
+    // task 2: 3, 4, 9, 10
+    for (int i = 0; i < step + 1; i++) {
+      if (i % 2 == 0 && (i * 3 + subPartitionIndex) < locations.length) {

Review Comment:
   get, typo,update later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1563133385


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -135,30 +136,37 @@ protected Compressor initialValue() {
 
   private final ReviveManager reviveManager;
 
+  private final boolean dataPushFailureTrackingEnabled;
+
   protected static class ReduceFileGroups {
     public Map<Integer, Set<PartitionLocation>> partitionGroups;
+    public Map<String, Set<PushFailedBatch>> pushFailedBatches;

Review Comment:
   The concern I have with this is, the data would be under replicated, right ?
   If yes, wouldn't it not be simply better to retry to the same (or perhaps updated) peer ?
   
   If that is an option, the problem you [described above](https://github.com/apache/celeborn/pull/2373#discussion_r1562166228) is designed away as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536773557


##########
client/src/main/scala/org/apache/celeborn/client/CommitManager.scala:
##########
@@ -207,13 +209,15 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
       mapId: Int,
       attemptId: Int,
       numMappers: Int,
-      partitionId: Int = -1): (Boolean, Boolean) = {
+      partitionId: Int = -1,
+      pushFailedBatches: util.Set[PushFailedBatch] = Sets.newHashSet()): (Boolean, Boolean) = {

Review Comment:
   get, thanks for advice, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#issuecomment-2016745799

   > Thanks @wangshengjie123 nice pr! Another suggestion is better to add UT for this feature.
   
   UTs is doing, test in cluster this week, uts will be submit later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775449


##########
common/src/main/java/org/apache/celeborn/common/write/PushFailedBatch.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.write;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class PushFailedBatch implements Serializable {
+
+  private int mapId;
+  private int attemptId;
+  private int batchId;
+  private int epoch;
+  private int reduceId;
+
+  public int getMapId() {
+    return mapId;
+  }
+
+  public void setMapId(int mapId) {
+    this.mapId = mapId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    this.attemptId = attemptId;
+  }
+
+  public int getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(int batchId) {
+    this.batchId = batchId;
+  }
+
+  public int getReduceId() {
+    return reduceId;
+  }
+
+  public void setReduceId(int reduceId) {
+    this.reduceId = reduceId;
+  }
+
+  public int getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
+
+  public PushFailedBatch(int mapId, int attemptId, int batchId, int reduceId, int epoch) {
+    this.mapId = mapId;
+    this.attemptId = attemptId;
+    this.batchId = batchId;
+    this.reduceId = reduceId;
+    this.epoch = epoch;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PushFailedBatch)) {
+      return false;
+    }
+    PushFailedBatch o = (PushFailedBatch) other;
+    return super.equals(o)
+        && mapId == o.mapId
+        && attemptId == o.attemptId
+        && batchId == o.batchId
+        && reduceId == o.reduceId
+        && epoch == o.epoch;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(super.hashCode(), mapId, attemptId, batchId, reduceId, epoch);

Review Comment:
   Fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [incubator-celeborn]

Posted by "wangshengjie123 (via GitHub)" <gi...@apache.org>.
wangshengjie123 commented on code in PR #2373:
URL: https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1536775191


##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.protocol.message
 
 import java.util
 import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap

Review Comment:
   Removed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "s0nskar (via GitHub)" <gi...@apache.org>.
s0nskar commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2095812142

   I've summarised the above discussion to solve data issues – to the best of my understanding – in this doc and proposed a approach as well. PTAL
   
   https://docs.google.com/document/d/1wOiVPp8Wp-yDgJLm6lrzgwnbrwKZbMJwE5jVXfUdemY/edit?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037298859

   @s0nskar Good point, this should be an issue for ResultStage, even though the ShuffleMapStage's output is deterministic.
   
   IIRC, vanilla Spark also has some limitations on stage retry cases for ResultStage when ShuffleMapStage's output is **indeterministic**, for such cases, we need to fail the job, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037660279

   @s0nskar I see your point. We should always treat `ShuffleMapStage`'s output as **indeterministic** under the current skew data handling approach to avoid correctness issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [celeborn]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#issuecomment-2037694692

   Hi @s0nskar , thanks for your point, I think you are correct. Seems this PR conflicts with stage rerun.
   
   > we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.
   
   @pan3793 Is it possible to force make it as `indeterministic`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org