You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "FMX (via GitHub)" <gi...@apache.org> on 2023/02/13 02:38:51 UTC

[GitHub] [incubator-celeborn] FMX opened a new pull request, #1221: [CELEBORN-243] Implement buffer stream.

FMX opened a new pull request, #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221

   ### What changes were proposed in this pull request?
   This pr enable Celeborn to support buffer stream and credit-based data flow.
   
   
   ### Why are the changes needed?
   To support buffer stream.
   
   
   ### Does this PR introduce _any_ user-facing change?
   NO.
   
   
   ### How was this patch tested?
   Flink Integration test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108110144


##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -994,4 +996,64 @@ object Utils extends Logging {
     }
   }
 
+  def checkedDownCast(value: Long): Int = {
+    val downCast = value.toInt
+    if (downCast.toLong != value) {
+      throw new IllegalArgumentException("Cannot downcast long value " + value + " to integer.")
+    }
+    downCast
+  }
+
+  @throws[IOException]
+  def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
+    val remainingBytes = fileChannel.size - fileChannel.position
+    if (remainingBytes < length) {
+      logError(
+        s"File remaining bytes not not enough, remaining: ${remainingBytes}, wanted: ${length}.")
+      throw new RuntimeException(s"File is corrupted ${fileChannel}")
+    }
+  }
+
+  @throws[IOException]
+  def readBuffer(fileChannel: FileChannel, buffer: ByteBuffer, length: Int): Unit = {
+    checkFileIntegrity(fileChannel, length)
+    buffer.clear
+    buffer.limit(length)
+    while (buffer.hasRemaining) fileChannel.read(buffer)
+    buffer.flip
+  }
+
+  def readBuffer(fileChannel: FileChannel, buffer: ByteBuf, length: Int): Unit = {
+    checkFileIntegrity(fileChannel, length)
+    val tmpBuffer = ByteBuffer.allocate(length)
+    while (tmpBuffer.hasRemaining) fileChannel.read(tmpBuffer)
+    tmpBuffer.flip()
+    buffer.writeBytes(tmpBuffer)
+  }
+
+  @throws[IOException]
+  def readBuffer(
+      fileChannel: FileChannel,
+      header: ByteBuffer,
+      buffer: ByteBuf,
+      headerSize: Int): Int = {
+    readBuffer(fileChannel, header, headerSize)
+    val bufferLength = header.getInt(12)
+    if (bufferLength <= 0 || bufferLength > buffer.capacity) {
+      logError(s"Incorrect buffer header, buffer length: ${bufferLength}.")
+      throw new RuntimeException(s"File ${fileChannel} is corrupted")

Review Comment:
   by fileChannel, we can't directly get the filename if log errors. can readbuffer add a parameter filename ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108107628


##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -994,4 +996,64 @@ object Utils extends Logging {
     }
   }
 
+  def checkedDownCast(value: Long): Int = {
+    val downCast = value.toInt
+    if (downCast.toLong != value) {
+      throw new IllegalArgumentException("Cannot downcast long value " + value + " to integer.")
+    }
+    downCast
+  }
+
+  @throws[IOException]
+  def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
+    val remainingBytes = fileChannel.size - fileChannel.position
+    if (remainingBytes < length) {
+      logError(
+        s"File remaining bytes not not enough, remaining: ${remainingBytes}, wanted: ${length}.")
+      throw new RuntimeException(s"File is corrupted ${fileChannel}")
+    }
+  }
+
+  @throws[IOException]
+  def readBuffer(fileChannel: FileChannel, buffer: ByteBuffer, length: Int): Unit = {
+    checkFileIntegrity(fileChannel, length)
+    buffer.clear
+    buffer.limit(length)
+    while (buffer.hasRemaining) fileChannel.read(buffer)
+    buffer.flip
+  }
+
+  def readBuffer(fileChannel: FileChannel, buffer: ByteBuf, length: Int): Unit = {
+    checkFileIntegrity(fileChannel, length)
+    val tmpBuffer = ByteBuffer.allocate(length)
+    while (tmpBuffer.hasRemaining) fileChannel.read(tmpBuffer)
+    tmpBuffer.flip()
+    buffer.writeBytes(tmpBuffer)
+  }
+
+  @throws[IOException]
+  def readBuffer(
+      fileChannel: FileChannel,
+      header: ByteBuffer,
+      buffer: ByteBuf,
+      headerSize: Int): Int = {
+    readBuffer(fileChannel, header, headerSize)
+    val bufferLength = header.getInt(12)
+    if (bufferLength <= 0 || bufferLength > buffer.capacity) {
+      logError(s"Incorrect buffer header, buffer length: ${bufferLength}.")
+      throw new RuntimeException(s"File ${fileChannel} is corrupted")
+    }
+    // attach header buffer to data buffer.
+    buffer.writeBytes(header)
+    readBuffer(fileChannel, buffer, bufferLength)
+    bufferLength + headerSize
+  }
+
+  def getShortFormattedFileName(fileInfo: FileInfo): String = {

Review Comment:
   getShortFormattedFileName can be removed. another pr will removed this,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#issuecomment-1432676789

   # [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1221](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e29fb50) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/534853bf8a001f736a70127a1759bdbda7033a01?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (534853b) will **decrease** coverage by `0.36%`.
   > The diff coverage is `9.90%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##               main    #1221      +/-   ##
   ============================================
   - Coverage     27.17%   26.81%   -0.36%     
   - Complexity      805      808       +3     
   ============================================
     Files           213      214       +1     
     Lines         18043    18387     +344     
     Branches       1967     2001      +34     
   ============================================
   + Hits           4901     4928      +27     
   - Misses        12815    13135     +320     
   + Partials        327      324       -3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...java/org/apache/celeborn/common/meta/FileInfo.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbWV0YS9GaWxlSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/celeborn/common/network/protocol/Message.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9wcm90b2NvbC9NZXNzYWdlLmphdmE=) | `50.57% <ø> (-0.01%)` | :arrow_down: |
   | [...che/celeborn/common/network/protocol/ReadData.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9wcm90b2NvbC9SZWFkRGF0YS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...orn/common/network/server/BufferStreamManager.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvQnVmZmVyU3RyZWFtTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...on/network/server/memory/ReadBufferDispatcher.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay9zZXJ2ZXIvbWVtb3J5L1JlYWRCdWZmZXJEaXNwYXRjaGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../scala/org/apache/celeborn/common/util/Utils.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3V0aWwvVXRpbHMuc2NhbGE=) | `6.48% <0.00%> (-0.43%)` | :arrow_down: |
   | [...service/deploy/worker/storage/StorageManager.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vc2VydmljZS9kZXBsb3kvd29ya2VyL3N0b3JhZ2UvU3RvcmFnZU1hbmFnZXIuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   | [.../celeborn/service/deploy/worker/FetchHandler.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vc2VydmljZS9kZXBsb3kvd29ya2VyL0ZldGNoSGFuZGxlci5zY2FsYQ==) | `37.23% <30.77%> (+0.02%)` | :arrow_up: |
   | [...he/celeborn/common/network/util/TransportConf.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbmV0d29yay91dGlsL1RyYW5zcG9ydENvbmYuamF2YQ==) | `78.27% <75.00%> (-3.55%)` | :arrow_down: |
   | [...cala/org/apache/celeborn/common/CelebornConf.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL0NlbGVib3JuQ29uZi5zY2FsYQ==) | `81.06% <85.72%> (+0.06%)` | :arrow_up: |
   | ... and [7 more](https://codecov.io/gh/apache/incubator-celeborn/pull/1221?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108111418


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2948,4 +2954,28 @@ object CelebornConf extends Logging {
       .doc("The size of network buffers required per result partition. The minimum valid value is 8M. Usually, several hundreds of megabytes memory is enough for large scale batch jobs.")
       .stringConf
       .createWithDefault("64m")
+
+  val PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
+    buildConf("celeborn.partition.initial.readBufferMin")

Review Comment:
   can readBufferMin be replaced readBuffersMin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#issuecomment-1434369450

   pls add more ut for Flink plugin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108122034


##########
common/src/main/java/org/apache/celeborn/common/network/server/BufferStreamManager.java:
##########
@@ -49,24 +76,507 @@ public int getBufferSize() {
     }
   }
 
-  public BufferStreamManager() {
+  public BufferStreamManager(int minReadBuffers, int maxReadBuffers, int threadsPerMountpoint) {
     nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
     streams = new ConcurrentHashMap<>();
+    streamCredits = new ConcurrentHashMap<>();
+    servingStreams = new ConcurrentHashMap<>();
+    activeMapPartitions = new ConcurrentHashMap<>();
+    this.minReadBuffers = minReadBuffers;
+    this.maxReadBuffers = maxReadBuffers;
+    this.threadsPerMountPoint = threadsPerMountpoint;
   }
 
-  public long registerStream(Channel channel, int bufferSize) {
+  public long registerStream(
+      Channel channel, int initialCredit, int startSubIndex, int endSubIndex, FileInfo fileInfo)
+      throws IOException {
     long streamId = nextStreamId.getAndIncrement();
-    streams.put(streamId, new StreamState(channel, bufferSize));
+    streams.put(streamId, new StreamState(channel, fileInfo.getBufferSize()));
+
+    MapDataPartition mapDataPartition =
+        activeMapPartitions.computeIfAbsent(fileInfo, (f) -> new MapDataPartition(fileInfo));
+    activeMapPartitions.put(fileInfo, mapDataPartition);
+
+    mapDataPartition.addStream(streamId);
+    addCredit(initialCredit, streamId);
+    servingStreams.put(streamId, mapDataPartition);
+    mapDataPartition.setupDataPartitionReader(startSubIndex, endSubIndex, streamId);
+
+    logger.debug("Register stream streamId: {}, fileInfo: {}", streamId, fileInfo);
+
     return streamId;
   }
 
-  public void addCredit(int numCredit, long streamId) {}
+  public void addCredit(int numCredit, long streamId) {
+    logger.debug("streamId: {}, add credit: {}", streamId, numCredit);
+    streamCredits.compute(
+        streamId,
+        (aLong, atomicInteger) -> {
+          if (atomicInteger == null) {
+            return new AtomicInteger(numCredit);
+          } else {
+            atomicInteger.getAndAdd(numCredit);
+            return atomicInteger;
+          }
+        });
+
+    MapDataPartition mapDataPartition = servingStreams.get(streamId);
+    if (mapDataPartition != null) {
+      DataPartitionReader streamReader = mapDataPartition.getStreamReader(streamId);
+      if (streamReader != null) {
+        streamReader.sendData();
+      }
+    }
+  }
 
   public void connectionTerminated(Channel channel) {
     for (Map.Entry<Long, StreamState> entry : streams.entrySet()) {
       if (entry.getValue().getAssociatedChannel() == channel) {
-        streams.remove(entry.getKey());
+        logger.info("connection closed, clean streamId: {}", entry.getKey());
+        cleanResource(entry.getKey());
+      }
+    }
+  }
+
+  public synchronized void cleanResource(long streamId) {
+    logger.debug("clean stream:" + streamId);
+    streams.remove(streamId);
+    streamCredits.remove(streamId);
+    FileInfo fileInfo = servingStreams.remove(streamId).fileInfo;
+    MapDataPartition mapDataPartition = activeMapPartitions.get(fileInfo);
+    mapDataPartition.removeStream(streamId);
+    if (mapDataPartition.activeStreamIds.isEmpty()) {
+      for (ByteBuf buffer : mapDataPartition.buffers) {
+        memoryManager.recycleReadBuffer(buffer);
+      }
+      activeMapPartitions.remove(fileInfo);
+    }
+  }
+
+  // this means active data partition
+  protected class MapDataPartition {
+    private final List<Long> activeStreamIds = new ArrayList<>();
+    private final FileInfo fileInfo;
+    private final Set<DataPartitionReader> readers = new HashSet<>();
+    private final ExecutorService readExecutor;
+    private final ConcurrentHashMap<Long, DataPartitionReader> streamReaders =
+        new ConcurrentHashMap<>();
+
+    /** All available buffers can be used by the partition readers for reading. */
+    protected Queue<ByteBuf> buffers;
+
+    public MapDataPartition(FileInfo fileInfo) {
+      this.fileInfo = fileInfo;
+      readExecutor = storageFetcherPool.getExecutorPool(fileInfo.getFilePath());
+    }
+
+    public synchronized void setupDataPartitionReader(
+        int startSubIndex, int endSubIndex, long streamId) throws IOException {
+      DataPartitionReader dataPartitionReader =
+          new DataPartitionReader(startSubIndex, endSubIndex, fileInfo, streamId);
+      dataPartitionReader.open();
+      // allocate resources when the first reader is registered
+      boolean allocateResources = readers.isEmpty();
+      readers.add(dataPartitionReader);
+      streamReaders.put(streamId, dataPartitionReader);
+
+      // create initial buffers for read
+      if (allocateResources) {
+        memoryManager.requestReadBuffers(
+            minReadBuffers,
+            maxReadBuffers,
+            fileInfo.getBufferSize(),
+            (allocatedBuffers, throwable) ->
+                MapDataPartition.this.onBuffer(new LinkedBlockingDeque<>(allocatedBuffers)));
+      } else {
+        triggerRead();
+      }
+    }
+
+    public DataPartitionReader getStreamReader(long streamId) {
+      return streamReaders.get(streamId);
+    }
+
+    // Read logic is executed on another thread.
+    public void onBuffer(Queue<ByteBuf> buffers) {
+      this.buffers = buffers;
+      triggerRead();
+    }
+
+    public void recycle(ByteBuf buffer, Queue<ByteBuf> bufferQueue) {
+      buffer.clear();
+      bufferQueue.add(buffer);
+      // avoid unnecessary thread switch
+      readBuffers();
+    }
+
+    public void readBuffers() {
+      PriorityQueue<DataPartitionReader> sortedReaders = new PriorityQueue<>(readers);
+      while (buffers.size() > 0 && !sortedReaders.isEmpty()) {
+        DataPartitionReader reader = sortedReaders.poll();
+        try {
+          if (!reader.readAndSend(buffers, (buffer) -> this.recycle(buffer, buffers))) {
+            readers.remove(reader);
+          }
+        } catch (IOException e) {
+          logger.error("Read thread error occurred, {}", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void triggerRead() {
+      readExecutor.submit(
+          () -> {
+            // Key for IO schedule.
+            synchronized (MapDataPartition.this) {
+              readBuffers();
+            }
+          });
+    }
+
+    public synchronized void addStream(long streamId) {
+      activeStreamIds.add(streamId);
+    }
+
+    public synchronized void removeStream(long streamId) {
+      activeStreamIds.remove(streamId);
+    }
+  }
+
+  private class Buffer {
+    private ByteBuf byteBuf;
+    private Consumer<ByteBuf> byteBufferConsumer;
+
+    public Buffer(ByteBuf byteBuf, Consumer<ByteBuf> byteBufferConsumer) {
+      this.byteBuf = byteBuf;
+      this.byteBufferConsumer = byteBufferConsumer;
+    }
+  }
+
+  // this is a specific partition reader
+  protected class DataPartitionReader implements Comparable<DataPartitionReader> {
+    private final ByteBuffer indexBuffer;
+    private final ByteBuffer headerBuffer;
+    private final int startPartitionIndex;
+    private final int endPartitionIndex;
+    private int numRegions;
+    private FileChannel dataFileChannel;
+    private FileChannel indexFileChannel;
+    private int numRemainingPartitions;
+    private int currentDataRegion = -1;
+    private long dataConsumingOffset;
+    private volatile long currentPartitionRemainingBytes;
+    private boolean isClosed;
+    private FileInfo fileInfo;
+    private int INDEX_ENTRY_SIZE = 16;
+    private long streamId;
+    protected final Object lock = new Object();
+
+    @GuardedBy("lock")
+    protected final Queue<Buffer> buffersRead = new ArrayDeque<>();
+
+    /** Whether all the data has been successfully read or not. */
+    @GuardedBy("lock")
+    protected boolean isFinished;
+
+    /** Whether this partition reader has been released or not. */
+    @GuardedBy("lock")
+    protected boolean isReleased;
+
+    /** Exception causing the release of this partition reader. */
+    @GuardedBy("lock")
+    protected Throwable errorCause;
+
+    /** Whether there is any error at the consumer side or not. */
+    @GuardedBy("lock")
+    protected boolean isError;
+
+    public DataPartitionReader(
+        int startPartitionIndex, int endPartitionIndex, FileInfo fileInfo, long streamId) {
+      this.startPartitionIndex = startPartitionIndex;
+      this.endPartitionIndex = endPartitionIndex;
+
+      int indexBufferSize = 16 * (endPartitionIndex - startPartitionIndex + 1);
+      this.indexBuffer = ByteBuffer.allocateDirect(indexBufferSize);
+
+      this.headerBuffer = ByteBuffer.allocateDirect(16);
+      this.streamId = streamId;
+
+      this.fileInfo = fileInfo;
+      this.isClosed = false;
+    }
+
+    public void open() throws IOException {
+      this.dataFileChannel = new FileInputStream(fileInfo.getFile()).getChannel();
+      this.indexFileChannel = new FileInputStream(fileInfo.getIndexPath()).getChannel();
+
+      long indexFileSize = indexFileChannel.size();
+      // index is (offset,length)
+      long indexRegionSize = fileInfo.getNumReducerPartitions() * (long) INDEX_ENTRY_SIZE;
+      this.numRegions = Utils.checkedDownCast(indexFileSize / indexRegionSize);
+
+      updateConsumingOffset();
+    }
+
+    public long getIndexRegionSize() {
+      return fileInfo.getNumReducerPartitions() * (long) INDEX_ENTRY_SIZE;
+    }
+
+    private void updateConsumingOffset() throws IOException {
+      while (currentPartitionRemainingBytes == 0
+          && (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0)) {
+        if (numRemainingPartitions <= 0) {
+          ++currentDataRegion;
+          numRemainingPartitions = endPartitionIndex - startPartitionIndex + 1;
+
+          // read the target index entry to the target index buffer
+          indexFileChannel.position(
+              currentDataRegion * getIndexRegionSize()
+                  + (long) startPartitionIndex * INDEX_ENTRY_SIZE);
+          Utils.readBuffer(indexFileChannel, indexBuffer, indexBuffer.capacity());
+        }
+
+        // get the data file offset and the data size
+        dataConsumingOffset = indexBuffer.getLong();
+        currentPartitionRemainingBytes = indexBuffer.getLong();
+        --numRemainingPartitions;
+
+        logger.debug(
+            "readBuffer updateConsumingOffset, {},  {}, {}, {}",
+            streamId,
+            dataFileChannel.size(),
+            dataConsumingOffset,
+            currentPartitionRemainingBytes);
+
+        // if these checks fail, the partition file must be corrupted
+        if (dataConsumingOffset < 0
+            || dataConsumingOffset + currentPartitionRemainingBytes > dataFileChannel.size()
+            || currentPartitionRemainingBytes < 0) {
+          throw new RuntimeException("File " + fileInfo.getFilePath() + " is corrupted");
+        }
+      }
+    }
+
+    public boolean readBuffer(ByteBuf buffer) throws IOException {
+      try {
+        dataFileChannel.position(dataConsumingOffset);
+
+        int readSize =
+            Utils.readBuffer(dataFileChannel, headerBuffer, buffer, headerBuffer.capacity());
+        currentPartitionRemainingBytes -= readSize;
+
+        logger.debug(
+            "readBuffer data: {}, {}, {}, {}, {}, {}",
+            streamId,
+            currentPartitionRemainingBytes,
+            readSize,
+            dataConsumingOffset,
+            Utils.getShortFormattedFileName(fileInfo),

Review Comment:
   Utils.getShortFormattedFileName(fileInfo) => fileInfo.getFilePath, because getShortFormattedFileName can't be distinguish different application.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108106326


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -199,6 +199,7 @@ public synchronized void destroy(IOException ioException) {
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);

Review Comment:
   "fileInfo.setNumReducerPartitions(numReducePartitions);" will be added by another pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108102896


##########
common/src/main/java/org/apache/celeborn/common/network/protocol/ReadData.java:
##########
@@ -14,48 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.celeborn.common.network.protocol;
 
 import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
+import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
+
+// This is buffer wrapper used in celeborn worker only
+// It doesn't need decode in worker.
 public class ReadData extends RequestMessage {
   private long streamId;
   private int backlog;
   private long offset;
-  private ByteBuf buf;
 
   public ReadData(long streamId, int backlog, long offset, ByteBuf buf) {
+    super(new NettyManagedBuffer(buf));
     this.streamId = streamId;
     this.backlog = backlog;
     this.offset = offset;
-    this.buf = buf;
   }
 
   @Override
   public int encodedLength() {
-    return 8 + 4 + 4 + 8 + 4 + buf.readableBytes();
+    return 8 + 4 + 8;
   }
 
   @Override
-  public void encode(ByteBuf buf) {
+  public void encode(io.netty.buffer.ByteBuf buf) {
     buf.writeLong(streamId);

Review Comment:
   io.netty.buffer.ByteBuf prefix is unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108103423


##########
common/src/main/java/org/apache/celeborn/common/network/protocol/ReadData.java:
##########
@@ -14,48 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.celeborn.common.network.protocol;
 
 import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
+import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
+
+// This is buffer wrapper used in celeborn worker only
+// It doesn't need decode in worker.
 public class ReadData extends RequestMessage {
   private long streamId;
   private int backlog;
   private long offset;
-  private ByteBuf buf;
 
   public ReadData(long streamId, int backlog, long offset, ByteBuf buf) {
+    super(new NettyManagedBuffer(buf));
     this.streamId = streamId;
     this.backlog = backlog;
     this.offset = offset;
-    this.buf = buf;
   }
 
   @Override
   public int encodedLength() {
-    return 8 + 4 + 4 + 8 + 4 + buf.readableBytes();
+    return 8 + 4 + 8;
   }
 
   @Override
-  public void encode(ByteBuf buf) {
+  public void encode(io.netty.buffer.ByteBuf buf) {

Review Comment:
   "io.netty.buffer.ByteBuf" prefix  is unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1109261275


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -86,6 +89,11 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
 
   def handleOpenStream(client: TransportClient, request: RpcRequest): Unit = {
     val msg = Message.decode(request.body().nioByteBuffer())
+    if (msg.`type`() == Type.BACKLOG_ANNOUNCEMENT) {

Review Comment:
   move this out from handleOpenStream method..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108112054


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -757,6 +758,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
     get(WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE)
   def memoryPerResultPartition: String = get(MEMORY_PER_RESULT_PARTITION)
 
+  def partitionReadBuffersMin: Int = get(PARTITION_READ_BUFFERS_MIN)
+
+  def partitionReadBuffersMax: Int = get(PARTITION_READ_BUFFERS_MAX)
+  def bufferStreamThreadsPerMountpoint = get(BUFFERSTREAM_THREADS_PER_MOUNTPOINT)

Review Comment:
   def bufferStreamThreadsPerMountpoint => def bufferStreamThreadsPerMountpoint: Int



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221#discussion_r1108105376


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -27,6 +27,7 @@ import scala.util.Try
 
 import org.apache.hadoop.security.UserGroupInformation
 
+import org.apache.celeborn.common.CelebornConf.{PARTITION_READ_BUFFERS_MAX, PARTITION_READ_BUFFERS_MIN}

Review Comment:
   the import is unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX merged pull request #1221: [CELEBORN-234] Implement buffer stream.

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX merged PR #1221:
URL: https://github.com/apache/incubator-celeborn/pull/1221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org