You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/21 01:13:43 UTC
[flink] 03/04: [FLINK-21417][core] Re-abstract wrapping methods for
memory segments.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1d0f5e5bc4e4926554a2f381b96ee4d569de8af5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 16:31:38 2021 +0800
[FLINK-21417][core] Re-abstract wrapping methods for memory segments.
---
.../flink/core/memory/DirectMemorySegment.java | 5 -----
.../flink/core/memory/HeapMemorySegment.java | 18 +++---------------
.../apache/flink/core/memory/MemorySegment.java | 22 +++++++++++++++++++---
.../flink/core/memory/OffHeapMemorySegment.java | 14 +-------------
.../flink/core/memory/MemorySegmentChecksTest.java | 15 +++++----------
5 files changed, 28 insertions(+), 46 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
index b878f18..068887d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
@@ -49,9 +49,4 @@ public final class DirectMemorySegment extends OffHeapMemorySegment {
DirectMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
super(buffer, owner);
}
-
- @Override
- public ByteBuffer wrap(int offset, int length) {
- return wrapInternal(offset, length);
- }
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
index 37787c8..808a5cd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -25,8 +25,6 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Function;
/**
* This class represents a piece of heap memory managed by Flink. The segment is backed by a byte
@@ -81,10 +79,10 @@ public final class HeapMemorySegment extends MemorySegment {
}
@Override
- public ByteBuffer wrap(int offset, int length) {
- try {
+ protected ByteBuffer wrapInternal(int offset, int length) {
+ if (!isFreed()) {
return ByteBuffer.wrap(this.memory, offset, length);
- } catch (NullPointerException e) {
+ } else {
throw new IllegalStateException("segment has been freed");
}
}
@@ -151,16 +149,6 @@ public final class HeapMemorySegment extends MemorySegment {
source.get(this.memory, offset, numBytes);
}
- @Override
- public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
- throw new UnsupportedOperationException("Unsupported because not needed atm.");
- }
-
- @Override
- public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
- throw new UnsupportedOperationException("Unsupported because not needed atm.");
- }
-
// -------------------------------------------------------------------------
// Factoring
// -------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index c34357b..d726e22 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -19,6 +19,7 @@
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
import java.io.DataInput;
import java.io.DataOutput;
@@ -277,7 +278,18 @@ public abstract class MemorySegment {
* @throws IndexOutOfBoundsException Thrown, if offset is negative or larger than the memory
* segment size, or if the offset plus the length is larger than the segment size.
*/
- public abstract ByteBuffer wrap(int offset, int length);
+ public ByteBuffer wrap(int offset, int length) {
+ return wrapInternal(offset, length);
+ }
+
+ /**
+ * This is an internal interface for wrapping the chunk of the underlying memory located between
+ * <tt>offset</tt> and <tt>offset + * length</tt> in a NIO ByteBuffer, without transferring the
+ * ownership of the memory.
+ *
+ * @see #wrap(int, int)
+ */
+ protected abstract ByteBuffer wrapInternal(int offset, int length);
/**
* Gets the owner of this memory segment. Returns null, if the owner was not set.
@@ -1576,7 +1588,9 @@ public abstract class MemorySegment {
* @param processFunction to be applied to the segment as {@link ByteBuffer}.
* @return the value that the process function returns.
*/
- public abstract <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction);
+ public final <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
+ return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
+ }
/**
* Supplies a {@link ByteBuffer} that represents this entire segment to the given process
@@ -1588,5 +1602,7 @@ public abstract class MemorySegment {
*
* @param processConsumer to accept the segment as {@link ByteBuffer}.
*/
- public abstract void processAsByteBuffer(Consumer<ByteBuffer> processConsumer);
+ public final void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
+ Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
index 954e2cb..b5e41d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
@@ -19,14 +19,11 @@
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
@@ -73,6 +70,7 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
offHeapBuffer = null; // to enable GC of unsafe memory
}
+ @Override
protected ByteBuffer wrapInternal(int offset, int length) {
if (!isFreed()) {
try {
@@ -87,14 +85,4 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
throw new IllegalStateException("segment has been freed");
}
}
-
- @Override
- public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
- return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
- }
-
- @Override
- public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
- Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
- }
}
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index df852fe..7d086e3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -23,8 +23,6 @@ import org.junit.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
/** Tests for the sanity checks of the memory segments. */
public class MemorySegmentChecksTest {
@@ -88,6 +86,11 @@ public class MemorySegmentChecksTest {
}
@Override
+ protected ByteBuffer wrapInternal(int offset, int length) {
+ return null;
+ }
+
+ @Override
public byte get(int index) {
return 0;
}
@@ -112,13 +115,5 @@ public class MemorySegmentChecksTest {
@Override
public void put(int offset, ByteBuffer source, int numBytes) {}
-
- @Override
- public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
- return null;
- }
-
- @Override
- public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {}
}
}