You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:47 UTC
[incubator-celeborn] 32/42: [CELEBORN-428][FLINK] Remove unnecessary lock in PartitionSortedBuffer (#1352)
This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 4b2cea27b1db9a05f4625b8b98bfd171fafb8521
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Thu Mar 16 11:56:23 2023 +0800
[CELEBORN-428][FLINK] Remove unnecessary lock in PartitionSortedBuffer (#1352)
---
.../plugin/flink/buffer/PartitionSortedBuffer.java | 130 ++++++++++-----------
1 file changed, 60 insertions(+), 70 deletions(-)
diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
index a4184cacb..03beb5f9d 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
@@ -59,7 +59,6 @@ public class PartitionSortedBuffer implements SortBuffer {
*/
private static final int INDEX_ENTRY_SIZE = 4 + 4 + 8;
- private final Object lock;
/** A buffer pool to request memory segments from. */
private final BufferPool bufferPool;
@@ -119,7 +118,6 @@ public class PartitionSortedBuffer implements SortBuffer {
@Nullable int[] customReadOrder) {
checkArgument(bufferSize > INDEX_ENTRY_SIZE, "Buffer size is too small.");
- this.lock = new Object();
this.bufferPool = checkNotNull(bufferPool);
this.bufferSize = bufferSize;
this.firstIndexEntryAddresses = new long[numSubpartitions];
@@ -230,19 +228,17 @@ public class PartitionSortedBuffer implements SortBuffer {
}
private void addBuffer(MemorySegment segment) {
- synchronized (lock) {
- if (segment.size() != bufferSize) {
- bufferPool.recycle(segment);
- throw new IllegalStateException("Illegal memory segment size.");
- }
-
- if (isReleased) {
- bufferPool.recycle(segment);
- throw new IllegalStateException("Sort buffer is already released.");
- }
+ if (segment.size() != bufferSize) {
+ bufferPool.recycle(segment);
+ throw new IllegalStateException("Illegal memory segment size.");
+ }
- buffers.add(segment);
+ if (isReleased) {
+ bufferPool.recycle(segment);
+ throw new IllegalStateException("Sort buffer is already released.");
}
+
+ buffers.add(segment);
}
private MemorySegment requestBufferFromPool() throws IOException {
@@ -271,57 +267,55 @@ public class PartitionSortedBuffer implements SortBuffer {
@Override
public BufferWithChannel copyIntoSegment(
MemorySegment target, BufferRecycler recycler, int offset) {
- synchronized (lock) {
- checkState(hasRemaining(), "No data remaining.");
- checkState(isFinished, "Should finish the sort buffer first before coping any data.");
- checkState(!isReleased, "Sort buffer is already released.");
+ checkState(hasRemaining(), "No data remaining.");
+ checkState(isFinished, "Should finish the sort buffer first before coping any data.");
+ checkState(!isReleased, "Sort buffer is already released.");
- int numBytesCopied = 0;
- DataType bufferDataType = DataType.DATA_BUFFER;
- int channelIndex = subpartitionReadOrder[readOrderIndex];
+ int numBytesCopied = 0;
+ DataType bufferDataType = DataType.DATA_BUFFER;
+ int channelIndex = subpartitionReadOrder[readOrderIndex];
- do {
- int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
- int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
- MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
+ do {
+ int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
+ int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
+ MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
- long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
- int length = getSegmentIndexFromPointer(lengthAndDataType);
- DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
+ long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
+ int length = getSegmentIndexFromPointer(lengthAndDataType);
+ DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
- // return the data read directly if the next to read is an event
- if (dataType.isEvent() && numBytesCopied > 0) {
- break;
- }
- bufferDataType = dataType;
+ // return the data read directly if the next to read is an event
+ if (dataType.isEvent() && numBytesCopied > 0) {
+ break;
+ }
+ bufferDataType = dataType;
- // get the next index entry address and move the read position forward
- long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
- sourceSegmentOffset += INDEX_ENTRY_SIZE;
+ // get the next index entry address and move the read position forward
+ long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
+ sourceSegmentOffset += INDEX_ENTRY_SIZE;
- // throws if the event is too big to be accommodated by a buffer.
- if (bufferDataType.isEvent() && target.size() < length) {
- throw new FlinkRuntimeException("Event is too big to be accommodated by a buffer");
- }
+ // throws if the event is too big to be accommodated by a buffer.
+ if (bufferDataType.isEvent() && target.size() - offset < length) {
+ throw new FlinkRuntimeException("Event is too big to be accommodated by a buffer");
+ }
+
+ numBytesCopied +=
+ copyRecordOrEvent(
+ target, numBytesCopied + offset, sourceSegmentIndex, sourceSegmentOffset, length);
- numBytesCopied +=
- copyRecordOrEvent(
- target, numBytesCopied + offset, sourceSegmentIndex, sourceSegmentOffset, length);
-
- if (recordRemainingBytes == 0) {
- // move to next channel if the current channel has been finished
- if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
- updateReadChannelAndIndexEntryAddress();
- break;
- }
- readIndexEntryAddress = nextReadIndexEntryAddress;
+ if (recordRemainingBytes == 0) {
+ // move to next channel if the current channel has been finished
+ if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
+ updateReadChannelAndIndexEntryAddress();
+ break;
}
- } while (numBytesCopied < target.size() - offset && bufferDataType.isBuffer());
+ readIndexEntryAddress = nextReadIndexEntryAddress;
+ }
+ } while (numBytesCopied < target.size() - offset && bufferDataType.isBuffer());
- numTotalBytesRead += numBytesCopied;
- Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType, numBytesCopied + offset);
- return new BufferWithChannel(buffer, channelIndex);
- }
+ numTotalBytesRead += numBytesCopied;
+ Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType, numBytesCopied + offset);
+ return new BufferWithChannel(buffer, channelIndex);
}
private int copyRecordOrEvent(
@@ -414,27 +408,23 @@ public class PartitionSortedBuffer implements SortBuffer {
@Override
public void release() {
// the sort buffer can be released by other threads
- synchronized (lock) {
- if (isReleased) {
- return;
- }
-
- isReleased = true;
+ if (isReleased) {
+ return;
+ }
- for (MemorySegment segment : buffers) {
- bufferPool.recycle(segment);
- }
- buffers.clear();
+ isReleased = true;
- numTotalBytes = 0;
- numTotalRecords = 0;
+ for (MemorySegment segment : buffers) {
+ bufferPool.recycle(segment);
}
+ buffers.clear();
+
+ numTotalBytes = 0;
+ numTotalRecords = 0;
}
@Override
public boolean isReleased() {
- synchronized (lock) {
- return isReleased;
- }
+ return isReleased;
}
}