You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "waitinfuture (via GitHub)" <gi...@apache.org> on 2023/02/13 15:09:19 UTC

[GitHub] [incubator-celeborn] waitinfuture opened a new pull request, #1232: [CELEBORN-295] Add double buffer for sort pusher

waitinfuture opened a new pull request, #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   Add double buffer for SortBasedShuffleWriter to pipeline task execution and data pushing
   
   
   ### Why are the changes needed?
   Currently sort based pusher is generally slower than hash based pusher, the lack of pipeline is perhaps one readon
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   UT and perf test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1109440008


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   I think sync methods "pushData" and "waitPushFinish" could be enough. If a thread is pushing, another thread must wait until the previous thread is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116628342


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,19 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    for (int i = 0; i < numPartitions; i++) {
+      shuffledPartitions[i] = i;
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      JavaUtils.randomizeInPlace(shuffledPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        reverseShuffledPartitions[shuffledPartitions[i]] = i;

Review Comment:
   the "reverse" is confusing, is there terminology for such a calculation? cc @nafiyAix 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1271232816


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   > 
   
   Hi @zuston , thanks for your attention on this code!
   I reviewed the code again and agree with you and @cfmcgrady that `pushData` can be called in `spill`. It'll be
   great if you could raise PRs to improve this (and other places :D)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1118318785


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +104,13 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    if (conf.pushSortRandomizePartitionIdEnabled()) {

Review Comment:
   Also prefer a separated pr for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu merged pull request #1232: [CELEBORN-295] Optimize data push

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu merged PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zuston commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1270179675


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   When surfacing this part code, I also have similar questions.
   
   > First, when spill is called, we want it truly free memory, but Celeborn's pushData is async, which means even though freeMemory is called inside pushData, the data may still be held in DataPusher before sent to wire. I understand this can be discussed that it's out of Spark's UnifiedMemoryManager.
   
   Async is not a problem when using `CompleteFuture` . Memory free could be called in `spill()` operation
   
   > the thread model has changed. 
   
   From my prospective, just allowing the current memory consumer to trigger spill can solve most problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [CELEBORN-295] Optimize data push [incubator-celeborn]

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1402018142


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   We meet a case of NPE in old code, I think after this pr won't meet this issue. If keep `pushData` here, should add some loc to keep it safe
   ```
   java.lang.NullPointerException
   at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:407)
   at org.apache.spark.shuffle.rss.SortBasedPusher.pushData(SortBasedPusher.java:155)
   at org.apache.spark.shuffle.rss.SortBasedPusher.spill(SortBasedPusher.java:317)
   at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:177)
   at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
   at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
   at org.apache.spark.sql.execution.python.HybridRowQueue.createNewQueue(RowQueue.scala:228)
   at org.apache.spark.sql.execution.python.HybridRowQueue.add(RowQueue.scala:251)
   at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:125)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
   at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
   at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
   at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116628814


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,19 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    for (int i = 0; i < numPartitions; i++) {
+      shuffledPartitions[i] = i;
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      JavaUtils.randomizeInPlace(shuffledPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        reverseShuffledPartitions[shuffledPartitions[i]] = i;

Review Comment:
   I can image "inverted", but I'm not sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1107062790


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java:
##########
@@ -70,7 +70,8 @@
   private final int numPartitions;
 
   private final long pushBufferMaxSize;
-  private SortBasedPusher sortBasedPusher;
+  private SortBasedPusher[] pushers = new SortBasedPusher[2];

Review Comment:
   Double buffering typically uses two buffers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1232: [CELEBORN-295] Add double buffer for sort pusher

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#issuecomment-1428124620

   # [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1232?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1232](https://codecov.io/gh/apache/incubator-celeborn/pull/1232?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (64d751e) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/2068e6ae377ca7d7936b9f2aff1b7363ec1bf064?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2068e6a) will **decrease** coverage by `0.03%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head 64d751e differs from pull request most recent head d83ba3f. Consider uploading reports for the commit d83ba3f to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##               main    #1232      +/-   ##
   ============================================
   - Coverage     27.24%   27.21%   -0.03%     
   + Complexity      808      804       -4     
   ============================================
     Files           212      212              
     Lines         17997    17999       +2     
     Branches       1964     1964              
   ============================================
   - Hits           4902     4896       -6     
   - Misses        12772    12776       +4     
   - Partials        323      327       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1232?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/celeborn/client/write/DataPusher.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1232?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jbGllbnQvd3JpdGUvRGF0YVB1c2hlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...celeborn/service/deploy/master/SlotsAllocator.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1232?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bWFzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9zZXJ2aWNlL2RlcGxveS9tYXN0ZXIvU2xvdHNBbGxvY2F0b3IuamF2YQ==) | `69.27% <0.00%> (-2.45%)` | :arrow_down: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116035865


##########
docs/configuration/worker.md:
##########
@@ -87,6 +87,7 @@ license: |
 | celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 | 
 | celeborn.worker.replicate.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | 
+| celeborn.worker.replicate.randomConnection | true | Whether worker will create random connection to peer when replicate data. | 0.2.1 | 

Review Comment:
   What does "random connection" mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116715910


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,19 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    for (int i = 0; i < numPartitions; i++) {
+      shuffledPartitions[i] = i;
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      JavaUtils.randomizeInPlace(shuffledPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        reverseShuffledPartitions[shuffledPartitions[i]] = i;

Review Comment:
   we can call it "invertedShuffledPartitions"
   
   https://www.pepcoding.com/resources/online-java-foundation/function-and-arrays/inverse_of_an_array/topic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1118244664


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java:
##########
@@ -216,12 +272,26 @@ private void write0(scala.collection.Iterator iterator) throws IOException {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         long insertStartTime = System.nanoTime();
-        sortBasedPusher.insertRecord(
-            serBuffer.getBuf(),
-            Platform.BYTE_ARRAY_OFFSET,
-            serializedRecordSize,
-            partitionId,
-            false);
+        boolean success =
+            currentPusher.insertRecord(
+                serBuffer.getBuf(),
+                Platform.BYTE_ARRAY_OFFSET,
+                serializedRecordSize,
+                partitionId,
+                false);
+        if (!success) {
+          pushAndSwitch();
+          success =
+              currentPusher.insertRecord(
+                  serBuffer.getBuf(),
+                  Platform.BYTE_ARRAY_OFFSET,
+                  serializedRecordSize,
+                  partitionId,
+                  false);
+          if (!success) {
+            throw new IOException("Unable to push after switching pusher!");

Review Comment:
   `CelebornIOException `



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -111,88 +133,104 @@ public SortBasedPusher(
             mapStatusLengths);
 
     pushBufferMaxSize = conf.pushBufferMaxSize();
-    pushSortMemoryThreshold = conf.pushSortMemoryThreshold();
+    this.pushSortMemoryThreshold = pushSortMemoryThreshold;
 
-    inMemSorter = new ShuffleInMemorySorter(this, 4 * 1024 * 1024);
+    int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
+    inMemSorter = new ShuffleInMemorySorter(this, initialSize);
+    this.sharedPushLock = sharedPushLock;
+    this.executorService = executorService;
   }
 
-  /**
-   * @return bytes of memory freed
-   * @throws IOException
-   */
   public long pushData() throws IOException {
-    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
-        inMemSorter.getSortedIterator();
-
-    byte[] dataBuf = new byte[pushBufferMaxSize];
-    int offSet = 0;
-    int currentPartition = -1;
-    while (sortedRecords.hasNext()) {
-      sortedRecords.loadNext();
-      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
-      assert (partition >= currentPartition);
-      if (partition != currentPartition) {
-        if (currentPartition == -1) {
-          currentPartition = partition;
-        } else {
-          int bytesWritten =
-              rssShuffleClient.mergeData(
-                  appId,
-                  shuffleId,
-                  mapId,
-                  attemptNumber,
-                  currentPartition,
-                  dataBuf,
-                  0,
-                  offSet,
-                  numMappers,
-                  numPartitions);
-          mapStatusLengths[currentPartition].add(bytesWritten);
-          afterPush.accept(bytesWritten);
-          currentPartition = partition;
+    // pushData should be synchronized between pushers
+    synchronized (sharedPushLock) {
+      final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+          inMemSorter.getSortedIterator();
+
+      byte[] dataBuf = new byte[pushBufferMaxSize];
+      int offSet = 0;
+      int currentPartition = -1;
+      while (sortedRecords.hasNext()) {
+        sortedRecords.loadNext();
+        final int partition =
+            shuffledPartitions != null
+                ? inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+                : sortedRecords.packedRecordPointer.getPartitionId();
+        if (partition != currentPartition) {
+          if (currentPartition == -1) {
+            currentPartition = partition;
+          } else {
+            int bytesWritten =
+                rssShuffleClient.mergeData(
+                    appId,
+                    shuffleId,
+                    mapId,
+                    attemptNumber,
+                    currentPartition,
+                    dataBuf,
+                    0,
+                    offSet,
+                    numMappers,
+                    numPartitions);
+            mapStatusLengths[currentPartition].add(bytesWritten);
+            afterPush.accept(bytesWritten);
+            currentPartition = partition;
+            offSet = 0;
+          }
+        }
+        final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
+        final Object recordPage = taskMemoryManager.getPage(recordPointer);
+        final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
+        int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
+
+        if (offSet + recordSize > dataBuf.length) {
+          dataPusher.addTask(partition, dataBuf, offSet);
           offSet = 0;
         }
+
+        long recordReadPosition = recordOffsetInPage + uaoSize;
+        Platform.copyMemory(
+            recordPage,
+            recordReadPosition,
+            dataBuf,
+            Platform.BYTE_ARRAY_OFFSET + offSet,
+            recordSize);
+        offSet += recordSize;
       }
-      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
-      final Object recordPage = taskMemoryManager.getPage(recordPointer);
-      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
-      int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
-
-      if (offSet + recordSize > dataBuf.length) {
-        dataPusher.addTask(partition, dataBuf, offSet);
-        offSet = 0;
+      if (offSet > 0) {
+        dataPusher.addTask(currentPartition, dataBuf, offSet);
       }
 
-      long recordReadPosition = recordOffsetInPage + uaoSize;
-      Platform.copyMemory(
-          recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET + offSet, recordSize);
-      offSet += recordSize;
-    }
-    if (offSet > 0) {
-      dataPusher.addTask(currentPartition, dataBuf, offSet);
-    }
+      long freedBytes = freeMemory();
+      inMemSorter.freeMemory();
 
-    long freedBytes = freeMemory();
-    inMemSorter.freeMemory();
-    return freedBytes;
+      return freedBytes;
+    }
   }
 
-  public void insertRecord(
+  /**
+   * @param recordBase
+   * @param recordOffset
+   * @param recordSize
+   * @param partitionId
+   * @param copySize
+   * @return false if reaches capacity threshold
+   * @throws IOException
+   */

Review Comment:
   Remove the comment?



##########
common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala:
##########
@@ -27,21 +28,30 @@ import org.apache.celeborn.common.protocol.PartitionLocation
 
 class WorkerPartitionLocationInfo extends Logging {
 
-  // key: ShuffleKey, values: (partitionId -> partition locations)
-  type PartitionInfo = util.HashMap[String, util.Map[Int, util.List[PartitionLocation]]]
+  // key: ShuffleKey, values: (partitionId -> (encodedPartitionId -> PartitionLocation))
+  type PartitionInfo = ConcurrentHashMap[String, ConcurrentHashMap[Long, PartitionLocation]]

Review Comment:
   We'd better do this in separated PR



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -218,7 +256,47 @@ public void insertRecord(
       Platform.copyMemory(recordBase, recordOffset, base, pageCursor, recordSize);
       pageCursor += recordSize;
     }
-    inMemSorter.insertRecord(recordAddress, partitionId);
+    if (shuffledPartitions != null) {
+      inMemSorter.insertRecord(recordAddress, shuffledPartitions[partitionId]);
+    } else {
+      inMemSorter.insertRecord(recordAddress, partitionId);
+    }
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    asyncPushing = true;
+    dataPusher.checkException();
+    executorService.submit(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              pushData();
+              asyncPushing = false;
+            } catch (IOException ie) {
+              dataPusher.setException(ie);
+            }
+          }
+        });
+  }
+
+  /**
+   * Since this method and pushData() are synchronized When this method returns, it means pushData
+   * has released lock
+   *
+   * @throws IOException
+   */
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (asyncPushing) {
+      try {
+        Thread.sleep(50);

Review Comment:
   Should we enable customize the sleep interval?



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1857,6 +1860,17 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
+  val WORKER_REPLICATE_RANDOM_CONNECTION: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.replicate.randomConnection")

Review Comment:
   `celeborn.worker.replicate.randomConnection.enabled`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116020028


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -52,6 +54,7 @@ public class SortBasedPusher extends MemoryConsumer {
   private final int pushBufferMaxSize;
   private final long pushSortMemoryThreshold;
   final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+  final long bytes8K = Utils.byteStringAsBytes("8k");

Review Comment:
   static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#issuecomment-1439772604

   ping @FMX @RexXiong @AngersZhuuuu please review this pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1243424798


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   Sorry this reply was pending since May 17...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1109416320


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   A thread is pushing data while another is querying its state. Change pushing to lock could be enough to avoid waiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116717549


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,19 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    for (int i = 0; i < numPartitions; i++) {
+      shuffledPartitions[i] = i;
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      JavaUtils.randomizeInPlace(shuffledPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        reverseShuffledPartitions[shuffledPartitions[i]] = i;

Review Comment:
   And we can assemble it in a method called "inverse", it's easy for testing and reusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116014783


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,22 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    ArrayList<Integer> list = new ArrayList(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      list.add(i);
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      Collections.shuffle(list);

Review Comment:
   the logic can be simplified by `Utils#randomizeInPlace` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1116018196


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -63,6 +66,10 @@ public class SortBasedPusher extends MemoryConsumer {
   CelebornConf conf;
   Consumer<Integer> afterPush;
   LongAdder[] mapStatusLengths;
+  Object globalPushLock;

Review Comment:
   maybe `static` as you say "global"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1115552290


##########
build/make-distribution.sh:
##########
@@ -127,7 +127,7 @@ function build_service {
   # Store the command as an array because $MVN variable might have spaces in it.
   # Normal quoting tricks don't work.
   # See: http://mywiki.wooledge.org/BashFAQ/050
-  BUILD_COMMAND=("$MVN" clean package -DskipTests $@)

Review Comment:
   Clean is necessary. It will clean all remaining build files and create a reproducible result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] cfmcgrady commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1251928641


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   After thoroughly understanding the `pushData()` logic, the `DataPusher`'s buffer is preallocated, which means invoking the `pushData()` method does not result in increased heap memory consumption by the `DataPusher`. I think it's worthy to call `pushData()` to free memory for the  Spark when pipeline is disabled
   
   https://github.com/apache/incubator-celeborn/blob/693172d0bd0614674405f6af863f5f0a361b5f3a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java#L83-L85



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] pan3793 commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1115558287


##########
build/make-distribution.sh:
##########
@@ -127,7 +127,7 @@ function build_service {
   # Store the command as an array because $MVN variable might have spaces in it.
   # Normal quoting tricks don't work.
   # See: http://mywiki.wooledge.org/BashFAQ/050
-  BUILD_COMMAND=("$MVN" clean package -DskipTests $@)

Review Comment:
   +1, please reserve clean, dirty cache may cause fatal issues



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1106604289


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   This waiting might waste time.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   I think waiting is not a good solution here. Maybe lock or sync will be better here. 



##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java:
##########
@@ -70,7 +70,8 @@
   private final int numPartitions;
 
   private final long pushBufferMaxSize;
-  private SortBasedPusher sortBasedPusher;
+  private SortBasedPusher[] pushers = new SortBasedPusher[2];

Review Comment:
   Two data pushers might be not enough, maybe adding a configuration will be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1114085610


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   Got it, thanks for explanation.



##########
build/make-distribution.sh:
##########
@@ -127,7 +127,7 @@ function build_service {
   # Store the command as an array because $MVN variable might have spaces in it.
   # Normal quoting tricks don't work.
   # See: http://mywiki.wooledge.org/BashFAQ/050
-  BUILD_COMMAND=("$MVN" clean package -DskipTests $@)

Review Comment:
   yeah, this is a mistake



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,22 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    ArrayList<Integer> list = new ArrayList(numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      list.add(i);
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      Collections.shuffle(list);

Review Comment:
   seems Utils#randomizeInPlace do not accept int[]



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   @FMX After further thinking, I believe we can't do as you said. insertRecord must be synchronized with pushData(), and if we just add lock to pushData and waitPushFinish, we can't guarantee insertRecord will not conflict with pushData.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +101,19 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    shuffledPartitions = new int[numPartitions];
+    reverseShuffledPartitions = new int[numPartitions];
+    for (int i = 0; i < numPartitions; i++) {
+      shuffledPartitions[i] = i;
+    }
+    if (conf.pushSortRandomizePartitionIdEnabled()) {
+      JavaUtils.randomizeInPlace(shuffledPartitions);
+      for (int i = 0; i < numPartitions; i++) {
+        reverseShuffledPartitions[shuffledPartitions[i]] = i;

Review Comment:
   modified, please have a look @pan3793 



##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java:
##########
@@ -216,12 +272,26 @@ private void write0(scala.collection.Iterator iterator) throws IOException {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         long insertStartTime = System.nanoTime();
-        sortBasedPusher.insertRecord(
-            serBuffer.getBuf(),
-            Platform.BYTE_ARRAY_OFFSET,
-            serializedRecordSize,
-            partitionId,
-            false);
+        boolean success =
+            currentPusher.insertRecord(
+                serBuffer.getBuf(),
+                Platform.BYTE_ARRAY_OFFSET,
+                serializedRecordSize,
+                partitionId,
+                false);
+        if (!success) {
+          pushAndSwitch();
+          success =
+              currentPusher.insertRecord(
+                  serBuffer.getBuf(),
+                  Platform.BYTE_ARRAY_OFFSET,
+                  serializedRecordSize,
+                  partitionId,
+                  false);
+          if (!success) {
+            throw new IOException("Unable to push after switching pusher!");

Review Comment:
   done



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -218,7 +256,47 @@ public void insertRecord(
       Platform.copyMemory(recordBase, recordOffset, base, pageCursor, recordSize);
       pageCursor += recordSize;
     }
-    inMemSorter.insertRecord(recordAddress, partitionId);
+    if (shuffledPartitions != null) {
+      inMemSorter.insertRecord(recordAddress, shuffledPartitions[partitionId]);
+    } else {
+      inMemSorter.insertRecord(recordAddress, partitionId);
+    }
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    asyncPushing = true;
+    dataPusher.checkException();
+    executorService.submit(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              pushData();
+              asyncPushing = false;
+            } catch (IOException ie) {
+              dataPusher.setException(ie);
+            }
+          }
+        });
+  }
+
+  /**
+   * Since this method and pushData() are synchronized When this method returns, it means pushData
+   * has released lock
+   *
+   * @throws IOException
+   */
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (asyncPushing) {
+      try {
+        Thread.sleep(50);

Review Comment:
   I prefer not, it's trivial



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -111,88 +133,104 @@ public SortBasedPusher(
             mapStatusLengths);
 
     pushBufferMaxSize = conf.pushBufferMaxSize();
-    pushSortMemoryThreshold = conf.pushSortMemoryThreshold();
+    this.pushSortMemoryThreshold = pushSortMemoryThreshold;
 
-    inMemSorter = new ShuffleInMemorySorter(this, 4 * 1024 * 1024);
+    int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
+    inMemSorter = new ShuffleInMemorySorter(this, initialSize);
+    this.sharedPushLock = sharedPushLock;
+    this.executorService = executorService;
   }
 
-  /**
-   * @return bytes of memory freed
-   * @throws IOException
-   */
   public long pushData() throws IOException {
-    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
-        inMemSorter.getSortedIterator();
-
-    byte[] dataBuf = new byte[pushBufferMaxSize];
-    int offSet = 0;
-    int currentPartition = -1;
-    while (sortedRecords.hasNext()) {
-      sortedRecords.loadNext();
-      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
-      assert (partition >= currentPartition);
-      if (partition != currentPartition) {
-        if (currentPartition == -1) {
-          currentPartition = partition;
-        } else {
-          int bytesWritten =
-              rssShuffleClient.mergeData(
-                  appId,
-                  shuffleId,
-                  mapId,
-                  attemptNumber,
-                  currentPartition,
-                  dataBuf,
-                  0,
-                  offSet,
-                  numMappers,
-                  numPartitions);
-          mapStatusLengths[currentPartition].add(bytesWritten);
-          afterPush.accept(bytesWritten);
-          currentPartition = partition;
+    // pushData should be synchronized between pushers
+    synchronized (sharedPushLock) {
+      final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+          inMemSorter.getSortedIterator();
+
+      byte[] dataBuf = new byte[pushBufferMaxSize];
+      int offSet = 0;
+      int currentPartition = -1;
+      while (sortedRecords.hasNext()) {
+        sortedRecords.loadNext();
+        final int partition =
+            shuffledPartitions != null
+                ? inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+                : sortedRecords.packedRecordPointer.getPartitionId();
+        if (partition != currentPartition) {
+          if (currentPartition == -1) {
+            currentPartition = partition;
+          } else {
+            int bytesWritten =
+                rssShuffleClient.mergeData(
+                    appId,
+                    shuffleId,
+                    mapId,
+                    attemptNumber,
+                    currentPartition,
+                    dataBuf,
+                    0,
+                    offSet,
+                    numMappers,
+                    numPartitions);
+            mapStatusLengths[currentPartition].add(bytesWritten);
+            afterPush.accept(bytesWritten);
+            currentPartition = partition;
+            offSet = 0;
+          }
+        }
+        final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
+        final Object recordPage = taskMemoryManager.getPage(recordPointer);
+        final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
+        int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
+
+        if (offSet + recordSize > dataBuf.length) {
+          dataPusher.addTask(partition, dataBuf, offSet);
           offSet = 0;
         }
+
+        long recordReadPosition = recordOffsetInPage + uaoSize;
+        Platform.copyMemory(
+            recordPage,
+            recordReadPosition,
+            dataBuf,
+            Platform.BYTE_ARRAY_OFFSET + offSet,
+            recordSize);
+        offSet += recordSize;
       }
-      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
-      final Object recordPage = taskMemoryManager.getPage(recordPointer);
-      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
-      int recordSize = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
-
-      if (offSet + recordSize > dataBuf.length) {
-        dataPusher.addTask(partition, dataBuf, offSet);
-        offSet = 0;
+      if (offSet > 0) {
+        dataPusher.addTask(currentPartition, dataBuf, offSet);
       }
 
-      long recordReadPosition = recordOffsetInPage + uaoSize;
-      Platform.copyMemory(
-          recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET + offSet, recordSize);
-      offSet += recordSize;
-    }
-    if (offSet > 0) {
-      dataPusher.addTask(currentPartition, dataBuf, offSet);
-    }
+      long freedBytes = freeMemory();
+      inMemSorter.freeMemory();
 
-    long freedBytes = freeMemory();
-    inMemSorter.freeMemory();
-    return freedBytes;
+      return freedBytes;
+    }
   }
 
-  public void insertRecord(
+  /**
+   * @param recordBase
+   * @param recordOffset
+   * @param recordSize
+   * @param partitionId
+   * @param copySize
+   * @return false if reaches capacity threshold
+   * @throws IOException
+   */

Review Comment:
   done



##########
common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala:
##########
@@ -27,21 +28,30 @@ import org.apache.celeborn.common.protocol.PartitionLocation
 
 class WorkerPartitionLocationInfo extends Logging {
 
-  // key: ShuffleKey, values: (partitionId -> partition locations)
-  type PartitionInfo = util.HashMap[String, util.Map[Int, util.List[PartitionLocation]]]
+  // key: ShuffleKey, values: (partitionId -> (encodedPartitionId -> PartitionLocation))
+  type PartitionInfo = ConcurrentHashMap[String, ConcurrentHashMap[Long, PartitionLocation]]

Review Comment:
   It's indeed separated to https://github.com/apache/incubator-celeborn/commit/3c8c58e09d01d815d68e769970f9de03959d14ac
   I removed those code from this PR, please take a look



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -63,6 +66,10 @@ public class SortBasedPusher extends MemoryConsumer {
   CelebornConf conf;
   Consumer<Integer> afterPush;
   LongAdder[] mapStatusLengths;
+  Object globalPushLock;

Review Comment:
   global means the two pushers share the same lock, maybe rename to sharedPushLock



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   > Curious, why not push data when the `spill` is triggered? @waitinfuture
   
   Hi @cfmcgrady , thanks for your attention on this! There are originally two reasons for not pushing data when spill is triggered.
   
   First, when ```spill``` is called, we want it truly free memory, but Celeborn's pushData is async, which means even though ```freeMemory``` is called inside ```pushData```, the data may still be held in ```DataPusher``` before sent to wire. I understand this can be discussed that it's out of Spark's UnifiedMemoryManager.
   
   Second, this PR introduces double-buffering for pushing data (disabled by default), and the thread model has changed. Previously, pushData will be called in ```insertRecord```, ```close``` and ```spill```, these methods are called in the same Spark Task's thread. This PR will call pushData in a separate thread pool ```executorService```, so if we want to call pushData in ```spill```, we have to synchronize ```insertRecord``` make avoid concurrent issues, which might hurt perf.
   
   Still, I think we can trigger pushing data in spill when pipeline is disabled, especially if we config ```celeborn.push.sortMemory.threshold``` to a big value. WDYT?



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1857,6 +1860,17 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
+  val WORKER_REPLICATE_RANDOM_CONNECTION: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.replicate.randomConnection")

Review Comment:
   done



##########
docs/configuration/worker.md:
##########
@@ -87,6 +87,7 @@ license: |
 | celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 | 
 | celeborn.worker.replicate.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | 
+| celeborn.worker.replicate.randomConnection | true | Whether worker will create random connection to peer when replicate data. | 0.2.1 | 

Review Comment:
   > What does "random connection" mean?
   
   In ```TransportClientFactory#createClient```, if -1 is passed as partitionId, it will use random index of clientPool.
   ```
       int clientIndex =
           partitionId < 0 ? rand.nextInt(numConnectionsPerPeer) : partitionId % numConnectionsPerPeer;
       TransportClient cachedClient = clientPool.clients[clientIndex];
   ```
   
   The problem is rather tricky. Before this PR I find that the number of replicate-server threads are always one, that's because even we pass partitionId to ```TransportClientFactory#createClient```, the calculated ```clientIndex``` is always the same, and this is related to the round-robin allocation of slots.
   Before this PR
   ```
   [hadoop@core-1-1(192.168.14.46) ~]$ cat tmp |grep replicate-s
   "replicate-server-9-1" #152 daemon prio=5 os_prio=0 tid=0x00007f8bf4001000 nid=0x753b runnable [0x00007f8b935f7000]
   ```
   After this PR
   ```
   [hadoop@core-1-1(192.168.14.46) ~]$ cat tmp |grep -i push-s
   "push-server-6-8" #147 daemon prio=5 os_prio=0 tid=0x00007fbe2400e000 nid=0x6ca9 runnable [0x00007fbfe90e0000]
   "push-server-6-7" #140 daemon prio=5 os_prio=0 tid=0x00007fbe2400c800 nid=0x6c95 runnable [0x00007fbfe97e7000]
   "push-server-6-6" #139 daemon prio=5 os_prio=0 tid=0x00007fbe2400a800 nid=0x6c93 runnable [0x00007fbfe98e8000]
   "push-server-6-5" #138 daemon prio=5 os_prio=0 tid=0x00007fbe24008800 nid=0x6c88 runnable [0x00007fbfea5ec000]
   "push-server-6-4" #135 daemon prio=5 os_prio=0 tid=0x00007fbe24007000 nid=0x6c7f runnable [0x00007fbfea8ef000]
   "push-server-6-3" #132 daemon prio=5 os_prio=0 tid=0x00007fbe24005000 nid=0x6c77 runnable [0x00007fbfeaff3000]
   "push-server-6-2" #130 daemon prio=5 os_prio=0 tid=0x00007fbe24003000 nid=0x6c6d runnable [0x00007fbfeb1f5000]
   "push-server-6-1" #128 daemon prio=5 os_prio=0 tid=0x00007fbe24001000 nid=0x6c65 runnable [0x00007fbfeb3f7000]
   ```



##########
docs/configuration/worker.md:
##########
@@ -87,6 +87,7 @@ license: |
 | celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 | 
 | celeborn.worker.replicate.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | 
+| celeborn.worker.replicate.randomConnection | true | Whether worker will create random connection to peer when replicate data. | 0.2.1 | 

Review Comment:
   I will refine the doc



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -579,10 +578,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
           }
 
           try {
-            val client = pushClientFactory.createClient(
-              peer.getHost,
-              peer.getReplicatePort,
-              location.getId)
+            val client = getClient(peer.getHost, peer.getReplicatePort, location.getId)

Review Comment:
   I think it's fine since the purpose of this PR is to optimize PushData by several ways.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1090,6 +1086,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
     val id = partitionUniqueId.split("-")(0).toInt
     (PackedPartitionId.getRawPartitionId(id), PackedPartitionId.getAttemptId(id))
   }
+
+  private def getClient(host: String, port: Int, partitionId: Int): TransportClient = {
+    if (conf.workerReplicateRandomConnection) {
+      pushClientFactory.createClient(host, port)
+    } else {
+      pushClientFactory.createClient(host, port, partitionId)

Review Comment:
   done



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -92,6 +104,13 @@ public SortBasedPusher(
     this.taskAttemptId = taskAttemptId;
     this.numMappers = numMappers;
     this.numPartitions = numPartitions;
+
+    if (conf.pushSortRandomizePartitionIdEnabled()) {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1109416320


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   A thread is pushing data while another is querying its state. Change pushing to lock could be enough to avoid waiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#issuecomment-1446007080

   regression passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Pipeline sort pusher by double-buffering

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1107064802


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -219,6 +230,39 @@ public void insertRecord(
       pageCursor += recordSize;
     }
     inMemSorter.insertRecord(recordAddress, partitionId);
+
+    return true;
+  }
+
+  public void triggerPush() throws IOException {
+    dataPusher.checkException();
+    if (pushing) {
+      throw new IOException("Re-trigger push!");
+    }
+    pushing = true;
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                pushData();
+                pushing = false;
+              } catch (IOException ie) {
+                dataPusher.setException(ie);
+              }
+            });
+    thread.start();
+  }
+
+  public void waitPushFinish() throws IOException {
+    dataPusher.checkException();
+    while (pushing) {
+      try {
+        Thread.sleep(100);

Review Comment:
   I don't quite get your point, this thread waits for pushing finish from another thread, how to use lock/sync to refactor? Can you post a pseudo code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "AngersZhuuuu (via GitHub)" <gi...@apache.org>.
AngersZhuuuu commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1118316370


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1090,6 +1086,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
     val id = partitionUniqueId.split("-")(0).toInt
     (PackedPartitionId.getRawPartitionId(id), PackedPartitionId.getAttemptId(id))
   }
+
+  private def getClient(host: String, port: Int, partitionId: Int): TransportClient = {
+    if (conf.workerReplicateRandomConnection) {
+      pushClientFactory.createClient(host, port)
+    } else {
+      pushClientFactory.createClient(host, port, partitionId)

Review Comment:
   Pls change `spark.shuffle.io.numConnectionsPerPeer` in this method's comment.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -579,10 +578,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
           }
 
           try {
-            val client = pushClientFactory.createClient(
-              peer.getHost,
-              peer.getReplicatePort,
-              location.getId)
+            val client = getClient(peer.getHost, peer.getReplicatePort, location.getId)

Review Comment:
   Should we make a separated pr for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#issuecomment-1442110611

   ping @FMX @pan3793 I have refactored the code, please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] cfmcgrady commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1195913456


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   Curious, why not push data when the `spill` is triggered? @waitinfuture 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1232: [CELEBORN-295] Optimize data push

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on code in PR #1232:
URL: https://github.com/apache/incubator-celeborn/pull/1232#discussion_r1257705644


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -316,11 +379,7 @@ private void allocateMemoryForRecordIfNecessary(int required) throws IOException
 
   @Override
   public long spill(long l, MemoryConsumer memoryConsumer) throws IOException {
-    logger.info("Pushdata in spill, memory used " + getUsed());
-    if (getUsed() != 0) {
-      logger.info("Pushdata is not empty , do push.");
-      return pushData();
-    }
+    logger.warn("SortBasedPusher not support spill yet");

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org