You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/01/16 09:26:12 UTC

[incubator-celeborn] branch CELEBORN-228 created (now 0aca63a6)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a change to branch CELEBORN-228
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


      at 0aca63a6 [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency.

This branch includes the following new commits:

     new 0aca63a6 [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-celeborn] 01/01: [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency.

Posted by et...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch CELEBORN-228
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 0aca63a67cb0231ab47e9398bdbf9849c14660c8
Author: Ethan Feng <et...@apache.org>
AuthorDate: Mon Jan 16 17:25:59 2023 +0800

    [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency.
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  2 +-
 docs/configuration/worker.md                       |  2 +-
 .../worker/storage/PartitionFilesSorter.java       | 84 +++++++++++++++++-----
 3 files changed, 67 insertions(+), 21 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 4e45b8f3..9d59f65c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1901,7 +1901,7 @@ object CelebornConf extends Logging {
     buildConf("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
       .withAlternative("rss.worker.initialReserveSingleSortMemory")
       .categories("worker")
-      .doc("Initial reserve memory when sorting a shuffle file off-heap.")
+      .doc("Reserved memory when sorting a shuffle file off-heap.")
       .version("0.2.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1mb")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8548c44b..d5cb68cc 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -69,7 +69,7 @@ license: |
 | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory where linux file block information is stored. | 0.2.0 | 
 | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application shuffle data dir have not been operated during le duration time, will mark this application as expired. | 0.2.0 | 
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
-| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 | 
+| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 | 
 | celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | 
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 6d3501d2..691f3c9d 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -47,7 +47,6 @@ import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
@@ -353,7 +352,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
       indexSize += entry.getValue().size() * 16;
     }
 
-    ByteBuffer indexBuf = ByteBuffer.allocateDirect(indexSize);
+    ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
     for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : indexMap.entrySet()) {
       int mapId = entry.getKey();
       List<ShuffleBlockInfo> list = entry.getValue();
@@ -379,7 +378,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
       }
       indexFileChannel.close();
     }
-    ((DirectBuffer) indexBuf).cleaner().clean();
   }
 
   protected void readStreamFully(FSDataInputStream stream, ByteBuffer buffer, String path)
@@ -397,6 +395,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
     }
   }
 
+
   protected void readChannelFully(FileChannel channel, ByteBuffer buffer, String path)
       throws IOException {
     while (buffer.hasRemaining()) {
@@ -563,14 +562,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
 
           index += batchHeaderLen + compressedSize;
           paddingBuf.clear();
-          if (compressedSize > reserveMemory) {
-            ((DirectBuffer) paddingBuf).cleaner().clean();
-            paddingBuf = expandBufferAndUpdateMemoryTracker(reserveMemory, compressedSize);
-            reserveMemory = compressedSize;
-          }
-          paddingBuf.limit(compressedSize);
-          // TODO: compare skip or read performance differential
-          readBufferFully(paddingBuf);
+          readBufferBySize(paddingBuf, compressedSize);
         }
 
         long fileIndex = 0;
@@ -591,7 +583,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
         }
 
-        ((DirectBuffer) paddingBuf).cleaner().clean();
         memoryManager.releaseSortMemory(reserveMemory);
 
         writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
@@ -656,14 +647,69 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
       }
     }
 
-    private ByteBuffer expandBufferAndUpdateMemoryTracker(int oldCapacity, int newCapacity)
-        throws InterruptedException {
-      memoryManager.releaseSortMemory(oldCapacity);
-      memoryManager.reserveSortMemory(newCapacity);
-      while (!memoryManager.sortMemoryReady()) {
-        Thread.sleep(20);
+
+    protected void readStreamBySize(
+            FSDataInputStream stream, ByteBuffer buffer, String path, int toRead) throws IOException {
+      int read = 0;
+      if (toRead < buffer.capacity()) {
+        buffer.limit(toRead);
+      }
+      while (read != toRead) {
+        int tmpRead = stream.read(buffer);
+        if (-1 == tmpRead) {
+          throw new IOException(
+                  "Unexpected EOF, file name : "
+                          + path
+                          + " position :"
+                          + stream.getPos()
+                          + " read size :"
+                          + read);
+        } else {
+          read += tmpRead;
+          if (!buffer.hasRemaining()) {
+            buffer.clear();
+            if (toRead - read < buffer.capacity()) {
+              buffer.limit(toRead - read);
+            }
+          }
+        }
       }
-      return ByteBuffer.allocateDirect(newCapacity);
     }
+    protected void readChannelBySize(FileChannel channel, ByteBuffer buffer, String path, int toRead)
+            throws IOException {
+      int read = 0;
+      if (toRead < buffer.capacity()) {
+        buffer.limit(toRead);
+      }
+      while (read != toRead) {
+        int tmpRead = channel.read(buffer);
+        if (-1 == tmpRead) {
+          throw new IOException(
+                  "Unexpected EOF, file name : "
+                          + path
+                          + " position :"
+                          + channel.position()
+                          + " read size :"
+                          + read);
+        } else {
+          read += tmpRead;
+          if (!buffer.hasRemaining()) {
+            buffer.clear();
+            if (toRead - read < buffer.capacity()) {
+              buffer.limit(toRead - read);
+            }
+          }
+        }
+      }
+    }
+
+    private void readBufferBySize(ByteBuffer buffer, int toRead) throws IOException {
+      if (isHdfs) {
+        readStreamBySize(hdfsOriginInput, buffer, originFilePath, toRead);
+      } else {
+        readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
+      }
+    }
+
   }
 }