You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/16 03:56:29 UTC

[incubator-celeborn] branch main updated: [CELEBORN-428][FLINK] Remove unnecessary lock in PartitionSortedBuffer (#1352)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 637081f60 [CELEBORN-428][FLINK] Remove unnecessary lock in PartitionSortedBuffer (#1352)
637081f60 is described below

commit 637081f604547771a9f49129526affcdf405ebe5
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Thu Mar 16 11:56:23 2023 +0800

    [CELEBORN-428][FLINK] Remove unnecessary lock in PartitionSortedBuffer (#1352)
---
 .../plugin/flink/buffer/PartitionSortedBuffer.java | 130 ++++++++++-----------
 1 file changed, 60 insertions(+), 70 deletions(-)

diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
index a4184cacb..03beb5f9d 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
@@ -59,7 +59,6 @@ public class PartitionSortedBuffer implements SortBuffer {
    */
   private static final int INDEX_ENTRY_SIZE = 4 + 4 + 8;
 
-  private final Object lock;
   /** A buffer pool to request memory segments from. */
   private final BufferPool bufferPool;
 
@@ -119,7 +118,6 @@ public class PartitionSortedBuffer implements SortBuffer {
       @Nullable int[] customReadOrder) {
     checkArgument(bufferSize > INDEX_ENTRY_SIZE, "Buffer size is too small.");
 
-    this.lock = new Object();
     this.bufferPool = checkNotNull(bufferPool);
     this.bufferSize = bufferSize;
     this.firstIndexEntryAddresses = new long[numSubpartitions];
@@ -230,19 +228,17 @@ public class PartitionSortedBuffer implements SortBuffer {
   }
 
   private void addBuffer(MemorySegment segment) {
-    synchronized (lock) {
-      if (segment.size() != bufferSize) {
-        bufferPool.recycle(segment);
-        throw new IllegalStateException("Illegal memory segment size.");
-      }
-
-      if (isReleased) {
-        bufferPool.recycle(segment);
-        throw new IllegalStateException("Sort buffer is already released.");
-      }
+    if (segment.size() != bufferSize) {
+      bufferPool.recycle(segment);
+      throw new IllegalStateException("Illegal memory segment size.");
+    }
 
-      buffers.add(segment);
+    if (isReleased) {
+      bufferPool.recycle(segment);
+      throw new IllegalStateException("Sort buffer is already released.");
     }
+
+    buffers.add(segment);
   }
 
   private MemorySegment requestBufferFromPool() throws IOException {
@@ -271,57 +267,55 @@ public class PartitionSortedBuffer implements SortBuffer {
   @Override
   public BufferWithChannel copyIntoSegment(
       MemorySegment target, BufferRecycler recycler, int offset) {
-    synchronized (lock) {
-      checkState(hasRemaining(), "No data remaining.");
-      checkState(isFinished, "Should finish the sort buffer first before coping any data.");
-      checkState(!isReleased, "Sort buffer is already released.");
+    checkState(hasRemaining(), "No data remaining.");
+    checkState(isFinished, "Should finish the sort buffer first before coping any data.");
+    checkState(!isReleased, "Sort buffer is already released.");
 
-      int numBytesCopied = 0;
-      DataType bufferDataType = DataType.DATA_BUFFER;
-      int channelIndex = subpartitionReadOrder[readOrderIndex];
+    int numBytesCopied = 0;
+    DataType bufferDataType = DataType.DATA_BUFFER;
+    int channelIndex = subpartitionReadOrder[readOrderIndex];
 
-      do {
-        int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
-        int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
-        MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
+    do {
+      int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
+      int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
+      MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
 
-        long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
-        int length = getSegmentIndexFromPointer(lengthAndDataType);
-        DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
+      long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
+      int length = getSegmentIndexFromPointer(lengthAndDataType);
+      DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
 
-        // return the data read directly if the next to read is an event
-        if (dataType.isEvent() && numBytesCopied > 0) {
-          break;
-        }
-        bufferDataType = dataType;
+      // return the data read directly if the next to read is an event
+      if (dataType.isEvent() && numBytesCopied > 0) {
+        break;
+      }
+      bufferDataType = dataType;
 
-        // get the next index entry address and move the read position forward
-        long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
-        sourceSegmentOffset += INDEX_ENTRY_SIZE;
+      // get the next index entry address and move the read position forward
+      long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
+      sourceSegmentOffset += INDEX_ENTRY_SIZE;
 
-        // throws if the event is too big to be accommodated by a buffer.
-        if (bufferDataType.isEvent() && target.size() < length) {
-          throw new FlinkRuntimeException("Event is too big to be accommodated by a buffer");
-        }
+      // throws if the event is too big to be accommodated by a buffer.
+      if (bufferDataType.isEvent() && target.size() - offset < length) {
+        throw new FlinkRuntimeException("Event is too big to be accommodated by a buffer");
+      }
+
+      numBytesCopied +=
+          copyRecordOrEvent(
+              target, numBytesCopied + offset, sourceSegmentIndex, sourceSegmentOffset, length);
 
-        numBytesCopied +=
-            copyRecordOrEvent(
-                target, numBytesCopied + offset, sourceSegmentIndex, sourceSegmentOffset, length);
-
-        if (recordRemainingBytes == 0) {
-          // move to next channel if the current channel has been finished
-          if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
-            updateReadChannelAndIndexEntryAddress();
-            break;
-          }
-          readIndexEntryAddress = nextReadIndexEntryAddress;
+      if (recordRemainingBytes == 0) {
+        // move to next channel if the current channel has been finished
+        if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
+          updateReadChannelAndIndexEntryAddress();
+          break;
         }
-      } while (numBytesCopied < target.size() - offset && bufferDataType.isBuffer());
+        readIndexEntryAddress = nextReadIndexEntryAddress;
+      }
+    } while (numBytesCopied < target.size() - offset && bufferDataType.isBuffer());
 
-      numTotalBytesRead += numBytesCopied;
-      Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType, numBytesCopied + offset);
-      return new BufferWithChannel(buffer, channelIndex);
-    }
+    numTotalBytesRead += numBytesCopied;
+    Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType, numBytesCopied + offset);
+    return new BufferWithChannel(buffer, channelIndex);
   }
 
   private int copyRecordOrEvent(
@@ -414,27 +408,23 @@ public class PartitionSortedBuffer implements SortBuffer {
   @Override
   public void release() {
     // the sort buffer can be released by other threads
-    synchronized (lock) {
-      if (isReleased) {
-        return;
-      }
-
-      isReleased = true;
+    if (isReleased) {
+      return;
+    }
 
-      for (MemorySegment segment : buffers) {
-        bufferPool.recycle(segment);
-      }
-      buffers.clear();
+    isReleased = true;
 
-      numTotalBytes = 0;
-      numTotalRecords = 0;
+    for (MemorySegment segment : buffers) {
+      bufferPool.recycle(segment);
     }
+    buffers.clear();
+
+    numTotalBytes = 0;
+    numTotalRecords = 0;
   }
 
   @Override
   public boolean isReleased() {
-    synchronized (lock) {
-      return isReleased;
-    }
+    return isReleased;
   }
 }