You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "zuston (via GitHub)" <gi...@apache.org> on 2023/03/13 09:55:08 UTC

[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #714: [#706] Implement spill method to avoid memory deadlock

zuston commented on code in PR #714:
URL: https://github.com/apache/incubator-uniffle/pull/714#discussion_r1133681805


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -247,10 +282,45 @@ private void requestExecutorMemory(long leastMem) {
     }
   }
 
+  public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockInfoList) {
+    long totalSize = 0;
+    long memoryUsed = 0;
+    List<AddBlockEvent> events = new ArrayList<>();
+    List<ShuffleBlockInfo> shuffleBlockInfosPerEvent = com.google.common.collect.Lists.newArrayList();
+    for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
+      totalSize += sbi.getSize();
+      memoryUsed += sbi.getFreeMemory();
+      shuffleBlockInfosPerEvent.add(sbi);
+      // split shuffle data according to the size
+      if (totalSize > sendSizeLimit) {
+        LOG.info("Build event with " + shuffleBlockInfosPerEvent.size()
+            + " blocks and " + totalSize + " bytes");
+        final long _memoryUsed = memoryUsed;
+        events.add(
+            new AddBlockEvent(taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed))
+        );
+        shuffleBlockInfosPerEvent = com.google.common.collect.Lists.newArrayList();
+        totalSize = 0;
+        memoryUsed = 0;
+      }
+    }
+    if (!shuffleBlockInfosPerEvent.isEmpty()) {
+      LOG.info("Build event with " + shuffleBlockInfosPerEvent.size()
+          + " blocks and " + totalSize + " bytes");
+      final long _memoryUsed = memoryUsed;
+      events.add(
+          new AddBlockEvent(taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed))
+      );
+    }
+    return events;
+  }
+
   @Override
   public long spill(long size, MemoryConsumer trigger) {
-    // there is no spill for such situation
-    return 0;
+    List<AddBlockEvent> events = buildBlockEvents(clear());

Review Comment:
   The spill method will trigger the send operation asynchronously for the following reasons
   
   If using sync mode to wait data sent, we have to use `future.get(timeout)` and not releasing memory after sent. The released data size should be returned in this method.
   
   But if event sending failed, the memory will leak. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org