You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/21 01:13:43 UTC

[flink] 03/04: [FLINK-21417][core] Re-abstract wrapping methods for memory segments.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d0f5e5bc4e4926554a2f381b96ee4d569de8af5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 16:31:38 2021 +0800

    [FLINK-21417][core] Re-abstract wrapping methods for memory segments.
---
 .../flink/core/memory/DirectMemorySegment.java     |  5 -----
 .../flink/core/memory/HeapMemorySegment.java       | 18 +++---------------
 .../apache/flink/core/memory/MemorySegment.java    | 22 +++++++++++++++++++---
 .../flink/core/memory/OffHeapMemorySegment.java    | 14 +-------------
 .../flink/core/memory/MemorySegmentChecksTest.java | 15 +++++----------
 5 files changed, 28 insertions(+), 46 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
index b878f18..068887d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
@@ -49,9 +49,4 @@ public final class DirectMemorySegment extends OffHeapMemorySegment {
     DirectMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
         super(buffer, owner);
     }
-
-    @Override
-    public ByteBuffer wrap(int offset, int length) {
-        return wrapInternal(offset, length);
-    }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
index 37787c8..808a5cd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -25,8 +25,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * This class represents a piece of heap memory managed by Flink. The segment is backed by a byte
@@ -81,10 +79,10 @@ public final class HeapMemorySegment extends MemorySegment {
     }
 
     @Override
-    public ByteBuffer wrap(int offset, int length) {
-        try {
+    protected ByteBuffer wrapInternal(int offset, int length) {
+        if (!isFreed()) {
             return ByteBuffer.wrap(this.memory, offset, length);
-        } catch (NullPointerException e) {
+        } else {
             throw new IllegalStateException("segment has been freed");
         }
     }
@@ -151,16 +149,6 @@ public final class HeapMemorySegment extends MemorySegment {
         source.get(this.memory, offset, numBytes);
     }
 
-    @Override
-    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-        throw new UnsupportedOperationException("Unsupported because not needed atm.");
-    }
-
-    @Override
-    public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
-        throw new UnsupportedOperationException("Unsupported because not needed atm.");
-    }
-
     // -------------------------------------------------------------------------
     //                             Factoring
     // -------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index c34357b..d726e22 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -277,7 +278,18 @@ public abstract class MemorySegment {
      * @throws IndexOutOfBoundsException Thrown, if offset is negative or larger than the memory
      *     segment size, or if the offset plus the length is larger than the segment size.
      */
-    public abstract ByteBuffer wrap(int offset, int length);
+    public ByteBuffer wrap(int offset, int length) {
+        return wrapInternal(offset, length);
+    }
+
+    /**
+     * This is an internal interface for wrapping the chunk of the underlying memory located between
+     * <tt>offset</tt> and <tt>offset + * length</tt> in a NIO ByteBuffer, without transferring the
+     * ownership of the memory.
+     *
+     * @see #wrap(int, int)
+     */
+    protected abstract ByteBuffer wrapInternal(int offset, int length);
 
     /**
      * Gets the owner of this memory segment. Returns null, if the owner was not set.
@@ -1576,7 +1588,9 @@ public abstract class MemorySegment {
      * @param processFunction to be applied to the segment as {@link ByteBuffer}.
      * @return the value that the process function returns.
      */
-    public abstract <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction);
+    public final <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
+        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
+    }
 
     /**
      * Supplies a {@link ByteBuffer} that represents this entire segment to the given process
@@ -1588,5 +1602,7 @@ public abstract class MemorySegment {
      *
      * @param processConsumer to accept the segment as {@link ByteBuffer}.
      */
-    public abstract void processAsByteBuffer(Consumer<ByteBuffer> processConsumer);
+    public final void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
+        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
+    }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
index 954e2cb..b5e41d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
@@ -19,14 +19,11 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
 
@@ -73,6 +70,7 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
         offHeapBuffer = null; // to enable GC of unsafe memory
     }
 
+    @Override
     protected ByteBuffer wrapInternal(int offset, int length) {
         if (!isFreed()) {
             try {
@@ -87,14 +85,4 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
             throw new IllegalStateException("segment has been freed");
         }
     }
-
-    @Override
-    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
-    }
-
-    @Override
-    public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
-        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
-    }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index df852fe..7d086e3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -23,8 +23,6 @@ import org.junit.Test;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /** Tests for the sanity checks of the memory segments. */
 public class MemorySegmentChecksTest {
@@ -88,6 +86,11 @@ public class MemorySegmentChecksTest {
         }
 
         @Override
+        protected ByteBuffer wrapInternal(int offset, int length) {
+            return null;
+        }
+
+        @Override
         public byte get(int index) {
             return 0;
         }
@@ -112,13 +115,5 @@ public class MemorySegmentChecksTest {
 
         @Override
         public void put(int offset, ByteBuffer source, int numBytes) {}
-
-        @Override
-        public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-            return null;
-        }
-
-        @Override
-        public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {}
     }
 }